from models import Location, Users, Config from db_connector import database,engine,db_session import pandas as pd import openmeteo_requests import requests_cache import requests from retry_requests import retry def initialize_database(): """ A function to initialize the database by checking table availability and creating it if it does not exist. """ db_session.add(Users(name="Argiris Deligiannidis",email="mai@argideli.com")) db_tables = ['locations'] # check table availability and create it if it does not exist for tb in db_tables: if not engine.dialect.has_table(engine.connect(), tb): print("\t*** Creating tables ***") database.metadata.create_all(engine) table_data = pd.read_csv ('./table_data/locations.csv', index_col=None, header=0) for idx in range(len(table_data)): location = { 'id': idx+1, 'name': table_data.loc[idx, "Capital City"], 'country': table_data.loc[idx, "Country"], 'latitude': float(table_data.loc[idx, "Latitude"]), 'longitude': float(table_data.loc[idx, "Longitude"]), 'user': 'bootstrap', } add_location(location, no_commit=True) db_session.commit() def get_database_locations(user=None): """ Retrieve locations from the database for a specified user, or all locations if no user is specified. """ if user is not None: locations = db_session.query(Config).filter(Config.user_id == user).all() else: locations = db_session.query(Location).all() return locations def get_max_id(): """ A function to retrieve the maximum ID from the Location table in the database. """ return max([id[0] for id in db_session.query(Location.id).all()]) def get_available_ids(id_num): """ Function to generate a list of available IDs based on existing IDs in the database. Parameters: id_num (int): The number of IDs to generate. Returns: List[int]: List of available IDs. """ db_ids = [id[0] for id in db_session.query(Location.id).all()] avail_ids = [loc for loc in range(max(db_ids)+1) if loc not in db_ids and loc != 0] for i in range(id_num-len(avail_ids)): if avail_ids != []: avail_ids.append(max(avail_ids)+1) else: avail_ids.append(max(db_ids)+1) return avail_ids def add_location(location, no_commit=False): """ A function that adds a location to the database session. Parameters: location (Location): The location object to be added to the database session. no_commit (bool): Flag indicating whether to commit the transaction immediately. """ if location["name"] != 'existing': db_session.add(Location(id=location["id"], name=location["name"], country=location["country"], latitude=location["latitude"], longitude=location["longitude"], ) ) if not no_commit: db_session.commit() if location["user"] != 'bootstrap': db_session.add(Config(user_id=location["user"],location_id=location["id"])) no_commit == False def config_disable_location(id, user): """ A function that disables a location configuration based on the provided ID and user. Parameters: id (int): The ID of the location to be disabled. Returns: None """ db_session.query(Config).filter(Config.location_id == id and Config.user_id == user).delete() db_session.commit() def delete_location(id): """ Deletes a location from the database based on the provided ID. Parameters: id (int): The ID of the location to be deleted. Returns: dict: A dictionary with the key "id" indicating that the location was successfully deleted. """ db_session.query(Config).filter(Config.location_id == id).delete() db_session.commit() db_session.query(Location).filter(Location.id == id).delete() db_session.commit() return {"id": "Deleted"} def chunkify_data(data, chunk_size): """ A function to split data into chunks of a specified size for processing. Parameters: - data: The input data to be chunked. - chunk_size: The size of each chunk to split the data into. Returns: - A generator that yields chunks of the data based on the specified chunk size. """ #NOTE bulk operation: Open Weather api has an upper limit of ~= 180 parameters for a request per second # so we will split the requests into chunks of 100 parameters for i in range(0, len(data), chunk_size): yield data[i:i + chunk_size] def retrieve_weather_data(location_id=None): """ A function to retrieve weather data based on location IDs. Parameters: - location_id: an optional parameter to specify the location ID(s) to retrieve weather data for. If 'all' is provided, data for all locations is returned. Returns: - If location_id is not 'all' and any specified location ID does not exist, a dictionary of invalid location IDs is returned. - If location_id is None, None is returned. - If location_id is 'all', weather data for all locations is returned in a dictionary. - Otherwise, weather data for the specified location IDs is returned in a dictionary. """ max_id = get_max_id() #NOTE: Disable location check if location_id is 'all' (returns all locations), debugging purposes if location_id != 'all': loc_check = {} for loc_id in location_id: if loc_id > max_id: loc_check[loc_id] = "The location does not exist" if len(loc_check) > 0: return {'error': loc_check} weather_data = {} #NOTE: Get weather data for all locations if location_id is 'all' (debugging purposes), otherwise get weather data for specified locations if location_id == 'all': locations = list(chunkify_data(db_session.query(Location).all(),100)) else: locations = list(chunkify_data(db_session.query(Location).filter(Location.id.in_(location_id)).all(),100)) for chunk in locations: coordinates = [] for i in range(len(chunk)): ids = get_available_ids(len(chunk)) coordinates.append([chunk[i].id, chunk[i].latitude, chunk[i].longitude, chunk[i].name]) weather_data.update(get_openmeteo_data(coordinates)) return weather_data def get_openmeteo_data(coordinates): """ Retrieves weather data from the OpenMeteo API for the given coordinates. Parameters: coordinates (list): List of tuples containing latitude, longitude, and location name. Returns: dict: A dictionary containing the retrieved weather data for each location. """ data_dict = {} cache_session = requests_cache.CachedSession('.cache', expire_after = 3600) retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2) openmeteo_client = openmeteo_requests.Client(session = retry_session) latitude = [coords[1] for coords in coordinates] longitude = [coords[2] for coords in coordinates] # NOTE: Uncomment to get all available weather data for each location # data_names = [ # "weather_code", "temperature_2m_max", "temperature_2m_min", # "apparent_temperature_max", "apparent_temperature_min", # "sunrise", "sunset", "daylight_duration", "sunshine_duration", # "uv_index_max", "uv_index_clear_sky_max", "precipitation_sum", # "rain_sum", "showers_sum", "snowfall_sum", "precipitation_hours", # "precipitation_probability_max", "wind_speed_10m_max", # "wind_gusts_10m_max", "wind_direction_10m_dominant", # "shortwave_radiation_sum", "et0_fao_evapotranspiration" # ] current_names = ["weather_code", "temperature_2m", "rain", "precipitation", "showers"] data_names = ["weather_code", "temperature_2m_max", "temperature_2m_min", "rain_sum"] #NOTE: OpenMeteo API has a limit of 10000 requests per day, # so in a production environment it would be wise to change to # an enterprise account OR utilize the clients for the requests url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": latitude, "longitude": longitude, "current": current_names, "daily": data_names } responses = openmeteo_client.weather_api(url, params=params) for idx, response in enumerate(responses, start=1): location = coordinates[idx-1][3] idx = coordinates[idx-1][0] current = response.Current() #set rain to max of the available openmeteo result variables max_rain = max([float(current.Variables(2).Value()), float(current.Variables(3).Value()), float(current.Variables(3).Value())]) current_data = {"weather_code": current.Variables(0).Value(), "temperature_2m": current.Variables(1).Value(), "rain": max_rain} daily = response.Daily() daily_data = {} daily_data.update({ variable: daily.Variables(i).ValuesAsNumpy().tolist() if variable not in ["sunrise", "sunset"] else daily.Variables(i).ValuesAsNumpy() for i, variable in enumerate(data_names) }) dates = pd.date_range(start = pd.to_datetime(daily.Time(), unit = "s", utc = True), end = pd.to_datetime(daily.TimeEnd(), unit = "s", utc = True), freq = pd.Timedelta(seconds = daily.Interval()), inclusive = "left").tolist() daily_data["date"] = [date.strftime("%d/%m/%Y") for date in dates] location_data = { "id": idx, "name": location, "coordinates": [response.Latitude(), response.Longitude()] } data_dict[idx] = { "id": idx, "data": { "location": location_data, "current": current_data, "daily": daily_data } } return data_dict def search_location(query): """ Retrieve location data based on the provided query string. Parameters: query (str): The query string used to search for locations. Returns: A dictionary containing location data with an index as the key and location information as the value. """ URL="https://geocoding-api.open-meteo.com/v1/search" PARAMS = { "name": query, "count": 10, "language": 'en', "format": 'json' } response = requests.get(url = URL, params = PARAMS) data_dict={} try: for idx,d in enumerate(response.json()['results']): data_dict[idx] = d data_dict[idx].update({'selected': False}) except KeyError: data_dict[0] = {'result': 'Error'} return data_dict