weather_api/utils.py
Argiris Deligiannidis 46a5b5c039 Update code base
2024-04-07 13:01:31 +03:00

193 lines
7.1 KiB
Python

from models import Location
from db_connector import database,engine,db_session
import pandas as pd
import openmeteo_requests
import requests_cache
from retry_requests import retry
def initialize_database():
"""
A function to initialize the database by checking table availability and creating it if it does not exist.
"""
db_tables = ['locations']
# check table availability and create it if it does not exist
for tb in db_tables:
if not engine.dialect.has_table(engine.connect(), tb):
print("Creating table: {}\n".format(tb))
database.metadata.create_all(engine)
table_data = pd.read_csv ('./table_data/locations.csv', index_col=None, header=0)
for i in range(len(table_data)):
add_location(Location(name=table_data.loc[i, "Capital City"],
latitude=float(table_data.loc[i, "Latitude"]),
longitude=float(table_data.loc[i, "Longitude"]),
),
no_commit=True
)
db_session.commit()
def add_location(location:Location , no_commit=False):
"""
A function that adds a location to the database session.
Parameters:
location (Location): The location object to be added to the database session.
no_commit (bool): Flag indicating whether to commit the transaction immediately.
Returns:
None
"""
db_session.add(location)
#print("Adding location: {}".format(location))
if not no_commit:
db_session.commit()
def delete_location(id):
"""
Deletes a location from the database based on the provided ID.
Parameters:
id (int): The ID of the location to be deleted.
Returns:
dict: A dictionary with the key "id" indicating that the location was successfully deleted.
"""
db_session.query(Location).filter(Location.id == id).delete()
db_session.commit()
return {"id": "Deleted"}
def chunkify_data(data, chunk_size):
"""
A function that chunks the input data into smaller pieces of the specified chunk size.
Parameters:
- data: the input data to be chunked
- chunk_size: the size of each chunk
Returns:
- A generator that yields chunks of the input data
"""
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
def retrieve_weather_data(location_id=None):
"""
A function to retrieve weather data based on location IDs.
Parameters:
- location_id: an optional parameter to specify the location ID(s) to retrieve weather data for. If 'all' is provided, data for all locations is returned.
Returns:
- If location_id is not 'all' and any specified location ID does not exist, a dictionary of invalid location IDs is returned.
- If location_id is None, None is returned.
- If location_id is 'all', weather data for all locations is returned in a dictionary.
- Otherwise, weather data for the specified location IDs is returned in a dictionary.
"""
print('xaz')
max_id = db_session.query(Location).count()
#NOTE: Disable location check if location_id is 'all' (returns all locations), debugging purposes
if location_id != 'all':
loc_check = {}
for loc_id in location_id:
if loc_id > max_id:
loc_check[loc_id] = "The location does not exist"
print(len(loc_check))
if len(loc_check) > 0:
return {'error': loc_check}
print('az')
weather_data = {}
#NOTE: Get weather data for all locations if location_id is 'all' (debugging purposes), otherwise get weather data for specified locations
if location_id == 'all':
locations = list(chunkify_data(db_session.query(Location).all(),100))
else:
locations = list(chunkify_data(db_session.query(Location).filter(Location.id.in_(location_id)).all(),100))
print(locations)
for chunk in locations:
coordinates = [[loc.id, loc.latitude, loc.longitude, loc.name] for loc in chunk]
weather_data.update(get_openmeteo_data(coordinates))
#print(weather_data)
#exit(0)
return weather_data
def get_openmeteo_data(coordinates):
"""
Retrieves weather data from the OpenMeteo API for the given coordinates.
Parameters:
coordinates (list): List of tuples containing latitude, longitude, and location name.
Returns:
dict: A dictionary containing the retrieved weather data for each location.
"""
data_dict = {}
cache_session = requests_cache.CachedSession('.cache', expire_after = 3600)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo_client = openmeteo_requests.Client(session = retry_session)
latitude = [coords[1] for coords in coordinates]
longitude = [coords[2] for coords in coordinates]
# NOTE: Uncomment to get all available weather data for each location
# data_names = [
# "weather_code", "temperature_2m_max", "temperature_2m_min",
# "apparent_temperature_max", "apparent_temperature_min",
# "sunrise", "sunset", "daylight_duration", "sunshine_duration",
# "uv_index_max", "uv_index_clear_sky_max", "precipitation_sum",
# "rain_sum", "showers_sum", "snowfall_sum", "precipitation_hours",
# "precipitation_probability_max", "wind_speed_10m_max",
# "wind_gusts_10m_max", "wind_direction_10m_dominant",
# "shortwave_radiation_sum", "et0_fao_evapotranspiration"
# ]
data_names = ["temperature_2m_max", "temperature_2m_min", "precipitation_sum"]
#NOTE: OpenMeteo API has a limit of 10000 requests per day,
# so in a production environment it would be wise to change to
# an enterprise account OR utilize the clients for the requests
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": latitude,
"longitude": longitude,
"daily": data_names
}
responses = openmeteo_client.weather_api(url, params=params)
for idx, response in enumerate(responses, start=1):
location = coordinates[idx-1][3]
idx = coordinates[idx-1][0]
daily = response.Daily()
daily_data = { variable: daily.Variables(i).ValuesAsNumpy().tolist()
if variable not in ["sunrise", "sunset"]
else daily.Variables(i).ValuesAsNumpy()
for i, variable in enumerate(data_names)
}
dates = pd.date_range(start = pd.to_datetime(daily.Time(), unit = "s", utc = True),
end = pd.to_datetime(daily.TimeEnd(), unit = "s", utc = True),
freq = pd.Timedelta(seconds = daily.Interval()),
inclusive = "left").tolist()
daily_data["date"] = [date.strftime("%d/%m/%Y") for date in dates]
daily_data["location_name"] = location
daily_data["resp_coordinates"] = [response.Latitude(), response.Longitude()]
data_dict[idx] = daily_data
return data_dict