309 lines
11 KiB
Python
309 lines
11 KiB
Python
from models import Location, Users, Config
|
|
from db_connector import database,engine,db_session
|
|
import pandas as pd
|
|
import openmeteo_requests
|
|
import requests_cache
|
|
import requests
|
|
from retry_requests import retry
|
|
|
|
|
|
def initialize_database():
|
|
"""
|
|
A function to initialize the database by checking table availability and creating it if it does not exist.
|
|
"""
|
|
db_session.add(Users(name="Argiris Deligiannidis",email="mai@argideli.com"))
|
|
|
|
db_tables = ['locations']
|
|
# check table availability and create it if it does not exist
|
|
for tb in db_tables:
|
|
if not engine.dialect.has_table(engine.connect(), tb):
|
|
print("\t*** Creating tables ***")
|
|
database.metadata.create_all(engine)
|
|
|
|
table_data = pd.read_csv ('./table_data/locations.csv', index_col=None, header=0)
|
|
for idx in range(len(table_data)):
|
|
location = {
|
|
'id': idx+1,
|
|
'name': table_data.loc[idx, "Capital City"],
|
|
'country': table_data.loc[idx, "Country"],
|
|
'latitude': float(table_data.loc[idx, "Latitude"]),
|
|
'longitude': float(table_data.loc[idx, "Longitude"]),
|
|
'user': 'bootstrap',
|
|
}
|
|
|
|
add_location(location, no_commit=True)
|
|
db_session.commit()
|
|
|
|
def get_database_locations(user=None):
|
|
"""
|
|
Retrieve locations from the database for a specified user, or all locations if no user is specified.
|
|
"""
|
|
if user is not None:
|
|
locations = db_session.query(Config).filter(Config.user_id == user).all()
|
|
else:
|
|
locations = db_session.query(Location).all()
|
|
|
|
|
|
return locations
|
|
|
|
def get_max_id():
|
|
"""
|
|
A function to retrieve the maximum ID from the Location table in the database.
|
|
"""
|
|
return max([id[0] for id in db_session.query(Location.id).all()])
|
|
|
|
def get_available_ids(id_num):
|
|
"""
|
|
Function to generate a list of available IDs based on existing IDs in the database.
|
|
Parameters:
|
|
id_num (int): The number of IDs to generate.
|
|
Returns:
|
|
List[int]: List of available IDs.
|
|
"""
|
|
db_ids = [id[0] for id in db_session.query(Location.id).all()]
|
|
avail_ids = [loc for loc in range(max(db_ids)+1) if loc not in db_ids and loc != 0]
|
|
|
|
for i in range(id_num-len(avail_ids)):
|
|
if avail_ids != []:
|
|
avail_ids.append(max(avail_ids)+1)
|
|
else:
|
|
avail_ids.append(max(db_ids)+1)
|
|
|
|
return avail_ids
|
|
|
|
def add_location(location, no_commit=False):
|
|
"""
|
|
A function that adds a location to the database session.
|
|
|
|
Parameters:
|
|
location (Location): The location object to be added to the database session.
|
|
no_commit (bool): Flag indicating whether to commit the transaction immediately.
|
|
"""
|
|
if location["name"] != 'existing':
|
|
db_session.add(Location(id=location["id"],
|
|
name=location["name"],
|
|
country=location["country"],
|
|
latitude=location["latitude"],
|
|
longitude=location["longitude"],
|
|
)
|
|
)
|
|
if not no_commit:
|
|
db_session.commit()
|
|
|
|
if location["user"] != 'bootstrap':
|
|
db_session.add(Config(user_id=location["user"],location_id=location["id"]))
|
|
no_commit == False
|
|
|
|
|
|
def config_disable_location(id, user):
|
|
"""
|
|
A function that disables a location configuration based on the provided ID and user.
|
|
|
|
Parameters:
|
|
id (int): The ID of the location to be disabled.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
db_session.query(Config).filter(Config.location_id == id and Config.user_id == user).delete()
|
|
db_session.commit()
|
|
|
|
def delete_location(id):
|
|
"""
|
|
Deletes a location from the database based on the provided ID.
|
|
|
|
Parameters:
|
|
id (int): The ID of the location to be deleted.
|
|
|
|
Returns:
|
|
dict: A dictionary with the key "id" indicating that the location was successfully deleted.
|
|
"""
|
|
db_session.query(Config).filter(Config.location_id == id).delete()
|
|
db_session.commit()
|
|
db_session.query(Location).filter(Location.id == id).delete()
|
|
db_session.commit()
|
|
|
|
return {"id": "Deleted"}
|
|
|
|
|
|
def chunkify_data(data, chunk_size):
|
|
"""
|
|
A function to split data into chunks of a specified size for processing.
|
|
|
|
Parameters:
|
|
- data: The input data to be chunked.
|
|
- chunk_size: The size of each chunk to split the data into.
|
|
|
|
Returns:
|
|
- A generator that yields chunks of the data based on the specified chunk size.
|
|
"""
|
|
#NOTE bulk operation: Open Weather api has an upper limit of ~= 180 parameters for a request per second
|
|
# so we will split the requests into chunks of 100 parameters
|
|
for i in range(0, len(data), chunk_size):
|
|
yield data[i:i + chunk_size]
|
|
|
|
|
|
def retrieve_weather_data(location_id=None):
|
|
"""
|
|
A function to retrieve weather data based on location IDs.
|
|
|
|
Parameters:
|
|
- location_id: an optional parameter to specify the location ID(s) to retrieve weather data for. If 'all' is provided, data for all locations is returned.
|
|
|
|
Returns:
|
|
- If location_id is not 'all' and any specified location ID does not exist, a dictionary of invalid location IDs is returned.
|
|
- If location_id is None, None is returned.
|
|
- If location_id is 'all', weather data for all locations is returned in a dictionary.
|
|
- Otherwise, weather data for the specified location IDs is returned in a dictionary.
|
|
"""
|
|
max_id = get_max_id()
|
|
|
|
#NOTE: Disable location check if location_id is 'all' (returns all locations), debugging purposes
|
|
if location_id != 'all':
|
|
loc_check = {}
|
|
for loc_id in location_id:
|
|
if loc_id > max_id:
|
|
loc_check[loc_id] = "The location does not exist"
|
|
if len(loc_check) > 0:
|
|
return {'error': loc_check}
|
|
|
|
weather_data = {}
|
|
|
|
#NOTE: Get weather data for all locations if location_id is 'all' (debugging purposes), otherwise get weather data for specified locations
|
|
if location_id == 'all':
|
|
locations = list(chunkify_data(db_session.query(Location).all(),100))
|
|
else:
|
|
locations = list(chunkify_data(db_session.query(Location).filter(Location.id.in_(location_id)).all(),100))
|
|
|
|
|
|
for chunk in locations:
|
|
coordinates = []
|
|
for i in range(len(chunk)):
|
|
ids = get_available_ids(len(chunk))
|
|
coordinates.append([chunk[i].id, chunk[i].latitude, chunk[i].longitude, chunk[i].name])
|
|
weather_data.update(get_openmeteo_data(coordinates))
|
|
|
|
return weather_data
|
|
|
|
def get_openmeteo_data(coordinates):
|
|
"""
|
|
Retrieves weather data from the OpenMeteo API for the given coordinates.
|
|
|
|
Parameters:
|
|
coordinates (list): List of tuples containing latitude, longitude, and location name.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the retrieved weather data for each location.
|
|
"""
|
|
|
|
data_dict = {}
|
|
cache_session = requests_cache.CachedSession('.cache', expire_after = 3600)
|
|
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
|
|
openmeteo_client = openmeteo_requests.Client(session = retry_session)
|
|
|
|
latitude = [coords[1] for coords in coordinates]
|
|
longitude = [coords[2] for coords in coordinates]
|
|
|
|
# NOTE: Uncomment to get all available weather data for each location
|
|
# data_names = [
|
|
# "weather_code", "temperature_2m_max", "temperature_2m_min",
|
|
# "apparent_temperature_max", "apparent_temperature_min",
|
|
# "sunrise", "sunset", "daylight_duration", "sunshine_duration",
|
|
# "uv_index_max", "uv_index_clear_sky_max", "precipitation_sum",
|
|
# "rain_sum", "showers_sum", "snowfall_sum", "precipitation_hours",
|
|
# "precipitation_probability_max", "wind_speed_10m_max",
|
|
# "wind_gusts_10m_max", "wind_direction_10m_dominant",
|
|
# "shortwave_radiation_sum", "et0_fao_evapotranspiration"
|
|
# ]
|
|
|
|
current_names = ["weather_code", "temperature_2m", "rain", "precipitation", "showers"]
|
|
data_names = ["weather_code", "temperature_2m_max", "temperature_2m_min", "rain_sum"]
|
|
|
|
#NOTE: OpenMeteo API has a limit of 10000 requests per day,
|
|
# so in a production environment it would be wise to change to
|
|
# an enterprise account OR utilize the clients for the requests
|
|
url = "https://api.open-meteo.com/v1/forecast"
|
|
params = {
|
|
"latitude": latitude,
|
|
"longitude": longitude,
|
|
"current": current_names,
|
|
"daily": data_names
|
|
}
|
|
|
|
responses = openmeteo_client.weather_api(url, params=params)
|
|
|
|
for idx, response in enumerate(responses, start=1):
|
|
location = coordinates[idx-1][3]
|
|
idx = coordinates[idx-1][0]
|
|
|
|
current = response.Current()
|
|
|
|
#set rain to max of the available openmeteo result variables
|
|
max_rain = max([float(current.Variables(2).Value()),
|
|
float(current.Variables(3).Value()),
|
|
float(current.Variables(3).Value())])
|
|
|
|
current_data = {"weather_code": current.Variables(0).Value(), "temperature_2m": current.Variables(1).Value(), "rain": max_rain}
|
|
|
|
daily = response.Daily()
|
|
daily_data = {}
|
|
daily_data.update({ variable: daily.Variables(i).ValuesAsNumpy().tolist()
|
|
if variable not in ["sunrise", "sunset"]
|
|
else daily.Variables(i).ValuesAsNumpy()
|
|
for i, variable in enumerate(data_names)
|
|
})
|
|
|
|
dates = pd.date_range(start = pd.to_datetime(daily.Time(), unit = "s", utc = True),
|
|
end = pd.to_datetime(daily.TimeEnd(), unit = "s", utc = True),
|
|
freq = pd.Timedelta(seconds = daily.Interval()),
|
|
inclusive = "left").tolist()
|
|
|
|
daily_data["date"] = [date.strftime("%d/%m/%Y") for date in dates]
|
|
|
|
location_data = {
|
|
"id": idx,
|
|
"name": location,
|
|
"coordinates": [response.Latitude(), response.Longitude()]
|
|
}
|
|
|
|
data_dict[idx] = {
|
|
"id": idx,
|
|
"data": {
|
|
"location": location_data,
|
|
"current": current_data,
|
|
"daily": daily_data
|
|
}
|
|
}
|
|
|
|
return data_dict
|
|
|
|
def search_location(query):
|
|
"""
|
|
Retrieve location data based on the provided query string.
|
|
|
|
Parameters:
|
|
query (str): The query string used to search for locations.
|
|
|
|
Returns:
|
|
A dictionary containing location data with an index as the key and location information as the value.
|
|
"""
|
|
|
|
URL="https://geocoding-api.open-meteo.com/v1/search"
|
|
PARAMS = {
|
|
"name": query,
|
|
"count": 10,
|
|
"language": 'en',
|
|
"format": 'json'
|
|
}
|
|
|
|
response = requests.get(url = URL, params = PARAMS)
|
|
data_dict={}
|
|
try:
|
|
for idx,d in enumerate(response.json()['results']):
|
|
data_dict[idx] = d
|
|
data_dict[idx].update({'selected': False})
|
|
except KeyError:
|
|
data_dict[0] = {'result': 'Error'}
|
|
|
|
return data_dict
|
|
|