diff --git a/pycode/memilio-epidata/memilio/epidata/defaultDict.py b/pycode/memilio-epidata/memilio/epidata/defaultDict.py index 7612990086..5599d0c9f5 100644 --- a/pycode/memilio-epidata/memilio/epidata/defaultDict.py +++ b/pycode/memilio-epidata/memilio/epidata/defaultDict.py @@ -51,7 +51,7 @@ 'file_format': 'json_timeasstring', 'no_raw': False, 'rep_date': False, - 'sanitize_data': 1 + 'sanitize_data': 1, } # The following dict EngEng makes sure that for all diff --git a/pycode/memilio-epidata/memilio/epidata/download_config.conf b/pycode/memilio-epidata/memilio/epidata/download_config.conf index 8dc63376f9..0a0354b8ec 100644 --- a/pycode/memilio-epidata/memilio/epidata/download_config.conf +++ b/pycode/memilio-epidata/memilio/epidata/download_config.conf @@ -32,3 +32,6 @@ no_raw = False # matplotlib backend to use mpl_backend = QtAgg + +# To dataset allows to not generate json file but rather return python objects +to_dataset = False \ No newline at end of file diff --git a/pycode/memilio-epidata/memilio/epidata/getCaseData.py b/pycode/memilio-epidata/memilio/epidata/getCaseData.py index d4b1d5da24..82f0256888 100644 --- a/pycode/memilio-epidata/memilio/epidata/getCaseData.py +++ b/pycode/memilio-epidata/memilio/epidata/getCaseData.py @@ -31,6 +31,7 @@ # Imports import os from datetime import date +from typing import Dict import matplotlib.pyplot as plt import numpy as np @@ -46,17 +47,25 @@ pd.options.mode.copy_on_write = True -def check_for_completeness(df, run_checks, merge_berlin=False, merge_eisenach=True): +def check_for_completeness(df: pd.DataFrame, + run_checks: bool, + merge_berlin: bool = False, + merge_eisenach: bool = True + ): """! Checks if all counties are mentioned in the case data set - This check had to be added due to incomplete data downloads - It is checked if all all counties are part of the data. - If data is incomplete the data is downloaded from another source. - Note: There is no check if data for every day and every county is available (which can happen). + This check had to be added due to incomplete data downloads + It is checked if all counties are part of the data. + If data is incomplete the data is downloaded from another source. + Note: There is no check if data for every day and every county is available (which can happen). - @param df pandas dataframe to check - @return Boolean to say if data is complete or not - """ + @param df pd.Dataframe. Dataframe to check + @param merge_berlin bool True or False. Defines if Berlin's districts are kept separated or get merged. Default defined in defaultDict. + @param merge_eisenach bool True or False. Defines if Eisenbach districts are kept separated or get merged. Default defined in defaultDict. + @param run_checks bool + + @return Boolean to say if data is complete or not + """ if run_checks: if not df.empty: return geoger.check_for_all_counties( @@ -72,19 +81,13 @@ def check_for_completeness(df, run_checks, merge_berlin=False, merge_eisenach=Tr return True -def get_case_data(read_data=dd.defaultDict['read_data'], - file_format=dd.defaultDict['file_format'], - out_folder=dd.defaultDict['out_folder'], - start_date=dd.defaultDict['start_date'], - end_date=dd.defaultDict['end_date'], - impute_dates=dd.defaultDict['impute_dates'], - moving_average=dd.defaultDict['moving_average'], - split_berlin=dd.defaultDict['split_berlin'], - rep_date=dd.defaultDict['rep_date'], - files='All', - **kwargs - ): - """! Downloads the case data and provides different kind of structured data +def fetch_case_data( + directory: str, + filename: str, + conf_obj, + read_data: bool = dd.defaultDict['read_data'], +) -> pd.DataFrame: + """! Downloads the case data The data is read either from the internet or from a json file (CaseDataFull.json), stored in an earlier run. If the data is read from the internet, before changing anything the data is stored in CaseDataFull.json. @@ -93,74 +96,30 @@ def get_case_data(read_data=dd.defaultDict['read_data'], The file is read in or stored at the folder "out_folder"/Germany/. To store and change the data we use pandas. - While working with the data - - the column names are changed to english depending on defaultDict - - a new column "Date" is defined. - - we are only interested in the values where the parameter NeuerFall, NeuerTodesfall, NeuGenesen are larger than 0. - The values, when these parameters are negative are just useful, - if one would want to get the difference to the previous day. - For details we refer to the above mentioned webpage. - - For all different parameters and different columns the values are added up for whole germany for every date - and the cumulative sum is calculated. Unless something else is mentioned. - - For Berlin all districts can be merged into one [Default]. Otherwise, Berlin is divided into multiple districts and - different file names are used. - - Following data is generated and written to the mentioned filename - - All infected (current and past) for whole germany are stored in "cases_infected" - - All deaths whole germany are stored in "cases_deaths" - - Infected, deaths and recovered for whole germany are stored in "cases_all_germany" - - Infected split for states are stored in "cases_infected_state" - - Infected, deaths and recovered split for states are stored in "cases_all_state" - - Infected split for counties are stored in "cases_infected_county(_split_berlin)" - - Infected, deaths and recovered split for county are stored in "cases_all_county(_split_berlin)" - - Infected, deaths and recovered split for gender are stored in "cases_all_gender" - - Infected, deaths and recovered split for state and gender are stored in "cases_all_state_gender" - - Infected, deaths and recovered split for county and gender are stored in "cases_all_county_gender(_split_berlin)" - - Infected, deaths and recovered split for age are stored in "cases_all_age" - - Infected, deaths and recovered split for state and age are stored in "cases_all_state_age" - - Infected, deaths and recovered split for county and age are stored in "cases_all_county_age(_split_berlin)" + @param directory str + Path to the output directory + @param filename str + Name of the full dataset filename + @param conf_obj + configuration object + @param read_data bool. Defines if data is read from file or downloaded. Default defined in defaultDict. - @param read_data True or False. Defines if data is read from file or downloaded. Default defined in defaultDict. - @param file_format File format which is used for writing the data. Default defined in defaultDict. - @param out_folder Folder where data is written to. Default defined in defaultDict. - @param start_date Date of first date in dataframe. Default 2020-01-01. - @param end_date Date of last date in dataframe. Default defined in defaultDict. - @param impute_dates True or False. Defines if values for dates without new information are imputed. Default defined in defaultDict. - @param moving_average Integers >=0. Applies an 'moving_average'-days moving average on all time series - to smooth out effects of irregular reporting. Default defined in defaultDict. - @param split_berlin True or False. Defines if Berlin's disctricts are kept separated or get merged. Default defined in defaultDict. - @param rep_date True or False. Defines if reporting date or reference date is taken into dataframe. Default defined in defaultDict. - @param files List of strings or 'All' or 'Plot'. Defnies which files should be provided (and plotted). Default 'All'. + @return df pd.Dataframe. Dataframe containing the downloaded case data """ - conf = gd.Conf(out_folder, **kwargs) - out_folder = conf.path_to_use - no_raw = conf.no_raw - run_checks = conf.checks - - if (files == 'All') or (files == ['All']): - files = ['infected', 'deaths', 'all_germany', 'infected_state', - 'all_state', 'infected_county', 'all_county', 'all_gender', - 'all_state_gender', 'all_county_gender', 'all_age', - 'all_state_age', 'all_county_age'] - if (files == 'Plot') or (files == ['Plot']): - # only consider plotable files - files = ['infected', 'deaths', 'all_gender', 'all_age'] - # handle error of passing a string of one file instead of a list - if isinstance(files, str): - files = [files] - - directory = os.path.join(out_folder, 'Germany/') - gd.check_dir(directory) - filename = "CaseDataFull" + run_checks = conf_obj.checks complete = False path = os.path.join(directory + filename + ".json") + try: url = "https://media.githubusercontent.com/media/robert-koch-institut/" + \ - "SARS-CoV-2-Infektionen_in_Deutschland/main/Aktuell_Deutschland_SarsCov2_Infektionen.csv" + "SARS-CoV-2-Infektionen_in_Deutschland/main/Aktuell_Deutschland_SarsCov2_Infektionen.csv" df = gd.get_file(path, url, read_data, param_dict={}, - interactive=conf.interactive) + interactive=conf_obj.interactive) complete = check_for_completeness(df, run_checks, merge_eisenach=True) - except: + except Exception as ex: + gd.default_print(verbosity_level="Warning", + message=f"The data could not be downloaded. The following exception was thrown:\n{ex}") pass if complete: if not read_data: @@ -175,9 +134,10 @@ def get_case_data(read_data=dd.defaultDict['read_data'], try: url = "https://opendata.arcgis.com/datasets/66876b81065340a4a48710b062319336_0.csv" # if this file is encoded with utf-8 German umlauts are not displayed correctly because they take two bytes - # utf_8_sig can identify those bytes as one sign and display it correctly + # utf_8_sig can identify those bytes as one sign and display it + # correctly df = gd.get_file(path, url, False, param_dict={ - "encoding": 'utf_8_sig'}, interactive=conf.interactive) + "encoding": 'utf_8_sig'}, interactive=conf_obj.interactive) complete = check_for_completeness( df, run_checks, merge_eisenach=True) except: @@ -186,11 +146,12 @@ def get_case_data(read_data=dd.defaultDict['read_data'], gd.default_print( "Info", "Case data is still incomplete. Trying a third source.") try: - # If the data on github is not available we download the case data from rki from covid-19 datahub - url = "https://npgeo-de.maps.arcgis.com/sharing/rest/content/" +\ - "items/f10774f1c63e40168479a1feb6c7ca74/data" + # If the data on github is not available we download the case + # data from rki from covid-19 datahub + url = "https://npgeo-de.maps.arcgis.com/sharing/rest/content/" + \ + "items/f10774f1c63e40168479a1feb6c7ca74/data" df = gd.get_file(path, url, False, param_dict={ - "encoding": 'utf_8_sig'}, interactive=conf.interactive) + "encoding": 'utf_8_sig'}, interactive=conf_obj.interactive) df.rename(columns={'FID': "OBJECTID"}, inplace=True) complete = check_for_completeness( df, run_checks, merge_eisenach=True) @@ -200,24 +161,58 @@ def get_case_data(read_data=dd.defaultDict['read_data'], raise FileNotFoundError( "Something went wrong, dataframe is empty for csv and geojson!") - # drop columns that do not exist in data from github + # drop columns that do not exist in data from github df.drop(["Altersgruppe2", "Datenstand", "OBJECTID", "Bundesland", "Landkreis"], axis=1, inplace=True) + + return df + + +def preprocess_case_data(raw_df: pd.DataFrame, + directory: str, + filename: str, + conf_obj, + split_berlin: bool = dd.defaultDict['split_berlin'], + rep_date: bool = dd.defaultDict['rep_date'], + ) -> pd.DataFrame: + """! Preprocessing of the case data + + While working with the data + - the column names are changed to english depending on defaultDict + - a new column "Date" is defined. + - we are only interested in the values where the parameter NeuerFall, NeuerTodesfall, NeuGenesen are larger than 0. + The values, when these parameters are negative are just useful, + if one would want to get the difference to the previous day. + For details we refer to the above mentioned webpage. + - For all different parameters and different columns the values are added up for whole germany for every date + and the cumulative sum is calculated. Unless something else is mentioned. + - For Berlin all districts can be merged into one [Default]. Otherwise, Berlin is divided into multiple districts and + different file names are used. + + @param raw_df pd.Dataframe. Contains the downloaded or read raw case data + @param directory str + Path to the output directory + @param filename str + Name of the full dataset filename + @param conf_obj + configuration object + @param split_berlin bool. Defines if Berlin's disctricts are kept separated or get merged. Default defined in defaultDict. + @param rep_date bool Defines if reporting date or reference date is taken into dataframe. Default defined in defaultDict. + + @return df pd.Dataframe + """ + no_raw = conf_obj.no_raw + with progress_indicator.Spinner(message='Preparing DataFrame'): - df = df.convert_dtypes() + df = raw_df.convert_dtypes() - # output data to not always download it if not no_raw: gd.write_dataframe(df, directory, filename, "json") # store dict values in parameter to not always call dict itself - Altersgruppe = dd.GerEng['Altersgruppe'] - Geschlecht = dd.GerEng['Geschlecht'] AnzahlFall = dd.GerEng['AnzahlFall'] AnzahlGenesen = dd.GerEng['AnzahlGenesen'] AnzahlTodesfall = dd.GerEng['AnzahlTodesfall'] - IdBundesland = dd.GerEng['IdBundesland'] - IdLandkreis = dd.GerEng['IdLandkreis'] # translate column gender from German to English and standardize df.loc[df.Geschlecht == 'unbekannt', [ @@ -265,7 +260,7 @@ def get_case_data(read_data=dd.defaultDict['read_data'], # get rid of unnecessary columns df.drop(['NeuerFall', 'NeuerTodesfall', 'NeuGenesen', - "IstErkrankungsbeginn", "Meldedatum", "Refdatum"], axis=1, inplace=True) + "IstErkrankungsbeginn", "Meldedatum", "Refdatum"], axis=1, inplace=True) # merge Berlin counties if not split_berlin: @@ -277,6 +272,80 @@ def get_case_data(read_data=dd.defaultDict['read_data'], dd.EngEng['idState'], dd.EngEng['ageRKI']]) + return df + + +def write_case_data(df: pd.DataFrame, + directory: str, + conf_obj, + file_format: str = dd.defaultDict['file_format'], + start_date: date = dd.defaultDict['start_date'], + end_date: date = dd.defaultDict['end_date'], + impute_dates: bool = dd.defaultDict['impute_dates'], + moving_average: int = dd.defaultDict['moving_average'], + split_berlin: bool = dd.defaultDict['split_berlin'], + rep_date: bool = dd.defaultDict['rep_date'], + files: str or list = 'All', + ) -> None or dict: + """! Writing the different case data file. + Following data is generated and written to the mentioned filename + - All infected (current and past) for whole germany are stored in "cases_infected" + - All deaths whole germany are stored in "cases_deaths" + - Infected, deaths and recovered for whole germany are stored in "cases_all_germany" + - Infected split for states are stored in "cases_infected_state" + - Infected, deaths and recovered split for states are stored in "cases_all_state" + - Infected split for counties are stored in "cases_infected_county(_split_berlin)" + - Infected, deaths and recovered split for county are stored in "cases_all_county(_split_berlin)" + - Infected, deaths and recovered split for gender are stored in "cases_all_gender" + - Infected, deaths and recovered split for state and gender are stored in "cases_all_state_gender" + - Infected, deaths and recovered split for county and gender are stored in "cases_all_county_gender(_split_berlin)" + - Infected, deaths and recovered split for age are stored in "cases_all_age" + - Infected, deaths and recovered split for state and age are stored in "cases_all_state_age" + - Infected, deaths and recovered split for county and age are stored in "cases_all_county_age(_split_berlin)" + + @param df pd.DataFrame + Processed dataframe + @param directory str + Path to the output directory + @param conf_obj + configuration object + @param file_format str + File format which is used for writing the data. Default defined in defaultDict. + @param start_date date + Date of first date in dataframe. Default 2020-01-01. + @param end_date date. Date of last date in dataframe. Default defined in defaultDict. + @param impute_dates bool True or False. Defines if values for dates without new information are imputed. Default defined in defaultDict. + @param moving_average int Integers >=0. Applies an 'moving_average'-days moving average on all time series smooth out effects of irregular reporting. Default defined in defaultDict. + @param split_berlin bool True or False. Defines if Berlin's districts are kept separated or get merged. Default defined in defaultDict. + @param rep_date bool True or False. Defines if reporting date or reference date is taken into dataframe. Default defined in defaultDict. + @param files list. List of strings or 'All' or 'Plot'. Defines which files should be provided (and plotted). Default 'All'. + + @return None + """ + + if (files == 'All') or (files == ['All']): + files = ['infected', 'deaths', 'all_germany', 'infected_state', + 'all_state', 'infected_county', 'all_county', 'all_gender', + 'all_state_gender', 'all_county_gender', 'all_age', + 'all_state_age', 'all_county_age'] + if (files == 'Plot') or (files == ['Plot']): + # only consider plotable files + files = ['infected', 'deaths', 'all_gender', 'all_age'] + # handle error of passing a string of one file instead of a list + if isinstance(files, str): + files = [files] + # dict for all files + # filename -> [groupby_list, .agg({}), groupby_index, groupby_cols, + # mod_cols] + Altersgruppe = dd.GerEng['Altersgruppe'] + Geschlecht = dd.GerEng['Geschlecht'] + AnzahlFall = dd.GerEng['AnzahlFall'] + AnzahlGenesen = dd.GerEng['AnzahlGenesen'] + AnzahlTodesfall = dd.GerEng['AnzahlTodesfall'] + IdBundesland = dd.GerEng['IdBundesland'] + IdLandkreis = dd.GerEng['IdLandkreis'] + dateToUse = dd.EngEng['date'] + # dict for all files # filename -> [groupby_list, .agg({}), groupby_index, groupby_cols, mod_cols] dict_files = { @@ -287,12 +356,14 @@ def get_case_data(read_data=dd.defaultDict['read_data'], 'infected_state': [[dateToUse, IdBundesland], {AnzahlFall: "sum"}, [IdBundesland], {dd.EngEng["idState"]: geoger.get_state_ids()}, ['Confirmed']], 'all_state': [[dateToUse, IdBundesland], {AnzahlFall: "sum", AnzahlTodesfall: "sum", AnzahlGenesen: "sum"}, - [IdBundesland], {dd.EngEng["idState"]: geoger.get_state_ids()}, + [IdBundesland], {dd.EngEng["idState"] + : geoger.get_state_ids()}, ['Confirmed', 'Deaths', 'Recovered']], 'infected_county': [[dateToUse, IdLandkreis], {AnzahlFall: "sum"}, [IdLandkreis], {dd.EngEng["idCounty"]: df[dd.EngEng["idCounty"]].unique()}, ['Confirmed']], 'all_county': [[dateToUse, IdLandkreis], {AnzahlFall: "sum", AnzahlTodesfall: "sum", AnzahlGenesen: "sum"}, - [IdLandkreis], {dd.EngEng["idCounty"]: df[dd.EngEng["idCounty"]].unique()}, + [IdLandkreis], {dd.EngEng["idCounty"] + : df[dd.EngEng["idCounty"]].unique()}, ['Confirmed', 'Deaths', 'Recovered']], 'all_gender': [[dateToUse, Geschlecht], {AnzahlFall: "sum", AnzahlTodesfall: "sum", AnzahlGenesen: "sum"}, [Geschlecht], {dd.EngEng["gender"]: list( @@ -306,12 +377,13 @@ def get_case_data(read_data=dd.defaultDict['read_data'], ['Confirmed', 'Deaths', 'Recovered']], 'all_county_gender': [[dateToUse, IdLandkreis, Geschlecht], {AnzahlFall: "sum", AnzahlTodesfall: "sum", - AnzahlGenesen: "sum"}, [IdLandkreis, Geschlecht], + AnzahlGenesen: "sum"}, [IdLandkreis, Geschlecht], {dd.EngEng["idCounty"]: df[dd.EngEng["idCounty"]].unique( ), dd.EngEng["gender"]: list(df[dd.EngEng["gender"]].unique())}, ['Confirmed', 'Deaths', 'Recovered']], 'all_age': [[dateToUse, Altersgruppe], {AnzahlFall: "sum", AnzahlTodesfall: "sum", AnzahlGenesen: "sum"}, - [Altersgruppe], {dd.EngEng["ageRKI"]: df[dd.EngEng["ageRKI"]].unique()}, + [Altersgruppe], {dd.EngEng["ageRKI"] + : df[dd.EngEng["ageRKI"]].unique()}, ['Confirmed', 'Deaths', 'Recovered']], 'all_state_age': [[dateToUse, IdBundesland, Altersgruppe], {AnzahlFall: "sum", AnzahlTodesfall: "sum", AnzahlGenesen: "sum"}, [ @@ -323,22 +395,25 @@ def get_case_data(read_data=dd.defaultDict['read_data'], {AnzahlFall: "sum", AnzahlTodesfall: "sum", AnzahlGenesen: "sum"}, [ IdLandkreis, Altersgruppe], {dd.EngEng["idCounty"]: df[dd.EngEng["idCounty"]].unique(), - dd.EngEng["ageRKI"]: df[dd.EngEng["ageRKI"]].unique()}, + dd.EngEng["ageRKI"]: df[dd.EngEng["ageRKI"]].unique()}, ['Confirmed', 'Deaths', 'Recovered']] } + dict_of_datasets = dict() + with progress_indicator.Spinner(): for file in files: if file not in dict_files.keys(): - raise gd.DataError('Error: File '+file+' cannot be written.') + raise gd.DataError('Error: File ' + file + + ' cannot be written.') # split berlin is only relevant for county level - if ('county' in file) and (split_berlin == True): + if ('county' in file) and (split_berlin is True): split_berlin_local = True else: # dont append _split_berlin to filename on germany/state level split_berlin_local = False filename = 'cases_' + \ - gd.append_filename(file, impute_dates, - moving_average, split_berlin_local, rep_date) + gd.append_filename(file, impute_dates, + moving_average, split_berlin_local, rep_date) # sum over all columns defined in dict_files df_local = df.groupby(dict_files[file][0]).agg(dict_files[file][1]) @@ -363,58 +438,121 @@ def get_case_data(read_data=dd.defaultDict['read_data'], df_local_cs = mdfs.extract_subframe_based_on_dates( df_local_cs, start_date, end_date) - gd.write_dataframe(df_local_cs, directory, filename, file_format) - - if conf.plot: - if file == 'infected': - # make plot - df_local_cs.plot(title='COVID-19 infections', grid=True, - style='-o') - plt.tight_layout() - plt.show() - - if file == 'deaths': - df_local_cs.plot(title='COVID-19 deaths', grid=True, - style='-o') - plt.tight_layout() - plt.show() - - df.agg({AnzahlFall: "sum", AnzahlTodesfall: "sum", AnzahlGenesen: "sum"}) \ - .plot(title='COVID-19 infections, deaths, recovered', grid=True, - kind='bar') - plt.tight_layout() - plt.show() - - if file == 'all_gender': - df.groupby(Geschlecht).agg( - {AnzahlFall: "sum", AnzahlTodesfall: "sum", - AnzahlGenesen: "sum"}).plot( - title='COVID-19 infections, deaths, recovered', - grid=True, kind='bar') - plt.tight_layout() - plt.show() - - if file == 'all_age': - df.groupby(Altersgruppe).agg( - {AnzahlFall: "sum", AnzahlTodesfall: "sum", AnzahlGenesen: "sum"}).plot( - title='COVID-19 infections, deaths, recovered for diff ages', - grid=True, kind='bar') - plt.tight_layout() - plt.show() - - # Dead by "Altersgruppe": - df_local = df.groupby(Altersgruppe).agg( - {AnzahlTodesfall: "sum"}) - - df_local.plot(title='COVID-19 deaths', grid=True, - kind='bar') - plt.tight_layout() - plt.show() + if not conf_obj.to_dataset: + gd.write_dataframe(df_local_cs, directory, + filename, file_format) + else: + dict_of_datasets.update({file: df_local_cs}) + if conf_obj.to_dataset is True: + return dict_of_datasets + + +def get_case_data(read_data: bool = dd.defaultDict['read_data'], + out_folder: str = dd.defaultDict['out_folder'], + file_format: str = dd.defaultDict['file_format'], + start_date: date = dd.defaultDict['start_date'], + end_date: date = dd.defaultDict['end_date'], + impute_dates: bool = dd.defaultDict['impute_dates'], + moving_average: int = dd.defaultDict['moving_average'], + split_berlin: bool = dd.defaultDict['split_berlin'], + rep_date: bool = dd.defaultDict['rep_date'], + files: str or list = 'All', + **kwargs + ) -> Dict: + """! Wrapper function that downloads the case data and provides different kind of structured data into json files. + + The data is read either from the internet or from a json file (CaseDataFull.json), stored in an earlier run. + If the data is read from the internet, before changing anything the data is stored in CaseDataFull.json. + If data should be downloaded, it is checked if data contains all counties. + If not a different source is tried. + The file is read in or stored at the folder "out_folder"/Germany/. + To store and change the data we use pandas. + + While working with the data + - the column names are changed to english depending on defaultDict + - a new column "Date" is defined. + - we are only interested in the values where the parameter NeuerFall, NeuerTodesfall, NeuGenesen are larger than 0. + The values, when these parameters are negative are just useful, + if one would want to get the difference to the previous day. + For details we refer to the above mentioned webpage. + - For all different parameters and different columns the values are added up for whole germany for every date + and the cumulative sum is calculated. Unless something else is mentioned. + - For Berlin all districts can be merged into one [Default]. Otherwise, Berlin is divided into multiple districts and + different file names are used. + - Following data is generated and written to the mentioned filename + - All infected (current and past) for whole germany are stored in "cases_infected" + - All deaths whole germany are stored in "cases_deaths" + - Infected, deaths and recovered for whole germany are stored in "cases_all_germany" + - Infected split for states are stored in "cases_infected_state" + - Infected, deaths and recovered split for states are stored in "cases_all_state" + - Infected split for counties are stored in "cases_infected_county(_split_berlin)" + - Infected, deaths and recovered split for county are stored in "cases_all_county(_split_berlin)" + - Infected, deaths and recovered split for gender are stored in "cases_all_gender" + - Infected, deaths and recovered split for state and gender are stored in "cases_all_state_gender" + - Infected, deaths and recovered split for county and gender are stored in "cases_all_county_gender(_split_berlin)" + - Infected, deaths and recovered split for age are stored in "cases_all_age" + - Infected, deaths and recovered split for state and age are stored in "cases_all_state_age" + - Infected, deaths and recovered split for county and age are stored in "cases_all_county_age(_split_berlin)" + + @param read_data True or False. Defines if data is read from file or downloaded. Default defined in defaultDict. + @param file_format File format which is used for writing the data. Default defined in defaultDict. + @param out_folder Folder where data is written to. Default defined in defaultDict. + @param start_date Date of first date in dataframe. Default 2020-01-01. + @param end_date Date of last date in dataframe. Default defined in defaultDict. + @param impute_dates True or False. Defines if values for dates without new information are imputed. Default defined in defaultDict. + @param moving_average Integers >=0. Applies an 'moving_average'-days moving average on all time series + to smooth out effects of irregular reporting. Default defined in defaultDict. + @param split_berlin True or False. Defines if Berlin's disctricts are kept separated or get merged. Default defined in defaultDict. + @param rep_date True or False. Defines if reporting date or reference date is taken into dataframe. Default defined in defaultDict. + @param files List of strings or 'All' or 'Plot'. Defnies which files should be provided (and plotted). Default 'All'. + @param to_dataset bool True or False. Whether to return the dataframe as an object instead of json file. + If True - returns objects with dataframes + If False - write dataframes into files + Default defined in defaultDict. + + @return None + """ + + conf = gd.Conf(out_folder, **kwargs) + out_folder = conf.path_to_use + + directory = os.path.join(out_folder, 'Germany/') + gd.check_dir(directory) + filename = "CaseDataFull" + + raw_df = fetch_case_data( + read_data=read_data, + directory=directory, + filename=filename, + conf_obj=conf, + ) + preprocess_df = preprocess_case_data( + raw_df=raw_df, + split_berlin=split_berlin, + rep_date=rep_date, + conf_obj=conf, + filename=filename, + directory=directory, + ) + datasets = write_case_data( + directory=directory, + df=preprocess_df, + file_format=file_format, + start_date=start_date, + end_date=end_date, + impute_dates=impute_dates, + moving_average=moving_average, + split_berlin=split_berlin, + rep_date=rep_date, + files=files, + conf_obj=conf + ) + if conf.to_dataset is True: + return datasets def main(): """! Main program entry.""" - arg_dict = gd.cli("cases") get_case_data(**arg_dict) diff --git a/pycode/memilio-epidata/memilio/epidata/getDIVIData.py b/pycode/memilio-epidata/memilio/epidata/getDIVIData.py index 8e7b5317df..dc5b58ef9b 100644 --- a/pycode/memilio-epidata/memilio/epidata/getDIVIData.py +++ b/pycode/memilio-epidata/memilio/epidata/getDIVIData.py @@ -37,6 +37,7 @@ import os from datetime import date +from typing import Tuple, Dict import pandas as pd @@ -46,15 +47,13 @@ from memilio.epidata import modifyDataframeSeries as mdfs -def get_divi_data(read_data=dd.defaultDict['read_data'], - file_format=dd.defaultDict['file_format'], - out_folder=dd.defaultDict['out_folder'], - start_date=date(2020, 4, 24), - end_date=dd.defaultDict['end_date'], - impute_dates=dd.defaultDict['impute_dates'], - moving_average=dd.defaultDict['moving_average'], - **kwargs - ): +def fetch_divi_data( + directory: str, + filename: str, + conf_obj, + read_data: bool = dd.defaultDict['read_data'], + file_format: str = dd.defaultDict['file_format'], +) -> pd.DataFrame: """! Downloads or reads the DIVI ICU data and writes them in different files. Available data starts from 2020-04-24. @@ -62,52 +61,72 @@ def get_divi_data(read_data=dd.defaultDict['read_data'], If it does not already exist, the folder Germany is generated in the given out_folder. If read_data == True and the file "FullData_DIVI.json" exists, the data is read form this file and stored in a pandas dataframe. If read_data = True and the file does not exist the program is stopped. - The downloaded dataframe is written to the file "FullData_DIVI". - After that, the columns are renamed to English and the state and county names are added. - Afterwards, three kinds of structuring of the data are done. - We obtain the chronological sequence of ICU and ICU_ventilated - stored in the files "county_divi".json", "state_divi.json" and "germany_divi.json" - for counties, states and whole Germany, respectively. - - @param read_data True or False. Defines if data is read from file or downloaded. Default defined in defaultDict. - @param file_format File format which is used for writing the data. Default defined in defaultDict. - @param out_folder Folder where data is written to. Default defined in defaultDict. - @param start_date Date of first date in dataframe. Default defined in defaultDict. - @param end_date Date of last date in dataframe. Default defined in defaultDict. - @param impute_dates True or False. Defines if values for dates without new information are imputed. Default defined in defaultDict. - @param moving_average Integers >=0. Applies an 'moving_average'-days moving average on all time series - to smooth out effects of irregular reporting. Default defined in defaultDict. - """ - - conf = gd.Conf(out_folder, **kwargs) - out_folder = conf.path_to_use - no_raw = conf.no_raw - # First csv data on 24-04-2020 - if start_date < date(2020, 4, 24): - gd.default_print('Warning', "First data available on 2020-04-24. " - "You asked for " + start_date.strftime("%Y-%m-%d") + - ". Changed it to 2020-04-24.") - start_date = date(2020, 4, 24) + @param directory str + Path to the output directory + @param conf_obj + configuration object + @param filename str + File format which is used for writing the data. Default defined in defaultDict. + @param read_data bool. True or False. Defines if data is read from file or downloaded. Default defined in defaultDict. + @param file_format str. File format which is used for writing the data. Default defined in defaultDict. - directory = os.path.join(out_folder, 'Germany/') - gd.check_dir(directory) + @return Tuple[df_raw, start_date] Tuple. Contains the fetched data as well as the adjusted starting date + """ + no_raw = conf_obj.no_raw - filename = "FullData_DIVI" - url = "https://raw.githubusercontent.com/robert-koch-institut/"\ - "Intensivkapazitaeten_und_COVID-19-Intensivbettenbelegung_in_Deutschland/"\ - "main/Intensivregister_Landkreise_Kapazitaeten.csv" + url = "https://raw.githubusercontent.com/robert-koch-institut/" \ + "Intensivkapazitaeten_und_COVID-19-Intensivbettenbelegung_in_Deutschland/" \ + "main/Intensivregister_Landkreise_Kapazitaeten.csv" path = os.path.join(directory + filename + ".json") df_raw = gd.get_file(path, url, read_data, param_dict={}, - interactive=conf.interactive) - + interactive=conf_obj.interactive) if not df_raw.empty: if not no_raw: gd.write_dataframe(df_raw, directory, filename, file_format) else: raise gd.DataError("Something went wrong, dataframe is empty.") - if conf.checks == True: + if conf_obj.checks is True: + divi_data_sanity_checks(df_raw) + else: + gd.default_print( + "Warning", "Sanity checks for DIVI data have not been executed.") + return df_raw + + +def preprocess_divi_data(df_raw: pd.DataFrame, + conf_obj, + start_date: date = date(2020, 4, 24), + end_date: date = dd.defaultDict['end_date'], + impute_dates: bool = dd.defaultDict['impute_dates'], + moving_average: int = dd.defaultDict['moving_average'], + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """! Processing of the downloaded data + * the columns are renamed to English and the state and county names are added. + + @param df_raw pd.DataFrame + @param conf_obj + configuration object + @param start_date date The first date in dataframe. Default defined in defaultDict. + @param end_date date The last date in dataframe. Default defined in defaultDict. + @param impute_dates bool Defines if values for dates without new information are imputed. Default defined in defaultDict. + @param moving_average int Integers >=0.Applies an 'moving_average'-days moving average on all time seriesto smooth out effects of irregular reporting. Default defined in defaultDict. + @param **kwargs + + @return df pd.DataFrame processed divi data + """ + # First csv data on 24-04-2020 + if start_date < date(2020, 4, 24): + gd.default_print( + 'Warning', + "First data available on 2020-04-24. " + "You asked for " + + start_date.strftime("%Y-%m-%d") + + ". Changed it to 2020-04-24.") + start_date = date(2020, 4, 24) + + if conf_obj.checks is True: divi_data_sanity_checks(df_raw) else: gd.default_print( @@ -121,14 +140,14 @@ def get_divi_data(read_data=dd.defaultDict['read_data'], try: df[dd.EngEng['date']] = pd.to_datetime( df[dd.EngEng['date']], format="%Y-%m-%d %H:%M:%S") - except: + except BaseException: raise gd.DataError( "Time data can't be transformed to intended format") # remove leading zeros for ID_County (if not yet done) df['ID_County'] = df['ID_County'].astype(int) # add missing dates (and compute moving average) - if (impute_dates == True) or (moving_average > 0): + if (impute_dates is True) or (moving_average > 0): df = mdfs.impute_and_reduce_df( df, {dd.EngEng["idCounty"]: df[dd.EngEng["idCounty"]].unique()}, [dd.EngEng["ICU"], @@ -138,13 +157,42 @@ def get_divi_data(read_data=dd.defaultDict['read_data'], # add names etc for empty frames (counties where no ICU beds are available) countyid_to_stateid = geoger.get_countyid_to_stateid_map() - for id in df.loc[df.isna().any(axis=1), dd.EngEng['idCounty']].unique(): - stateid = countyid_to_stateid[id] - df.loc[df[dd.EngEng['idCounty']] == id, dd.EngEng['idState']] = stateid + for county_id in df.loc[df.isna().any(axis=1), dd.EngEng['idCounty']].unique(): + state_id = countyid_to_stateid[county_id] + df.loc[df[dd.EngEng['idCounty']] == county_id, + dd.EngEng['idState']] = state_id # extract subframe of dates df = mdfs.extract_subframe_based_on_dates(df, start_date, end_date) + return df, df_raw + + +def write_divi_data(df: pd.DataFrame, + directory: str, + conf_obj, + file_format: str = dd.defaultDict['file_format'], + impute_dates: bool = dd.defaultDict['impute_dates'], + moving_average: int = dd.defaultDict['moving_average'], + ) -> Dict: + """! Write the divi data into json files + + Three kinds of structuring of the data are done. + We obtain the chronological sequence of ICU and ICU_ventilated + stored in the files "county_divi".json", "state_divi.json" and "germany_divi.json" + for counties, states and whole Germany, respectively. + + @param df pd.DataFrame. Dataframe containing processed divi data + @param directory str + Path to the output directory + @param conf_obj + configuration object + @param file_format str. File format which is used for writing the data. Default defined in defaultDict. + @param impute_dates bool True or False. Defines if values for dates without new information are imputed. Default defined in defaultDict. + @param moving_average int Integers >=0. Applies an 'moving_average'-days moving average on all time series to smooth out effects of irregular reporting. Default defined in defaultDict. + + @return data_dict Dict Dictionary containing datasets at county, state and national level + """ # write data for counties to file df_counties = df[[dd.EngEng["idCounty"], dd.EngEng["county"], @@ -155,9 +203,6 @@ def get_divi_data(read_data=dd.defaultDict['read_data'], df_counties = geoger.merge_df_counties_all( df_counties, sorting=[dd.EngEng["idCounty"], dd.EngEng["date"]]) # save - filename = "county_divi" - filename = gd.append_filename(filename, impute_dates, moving_average) - gd.write_dataframe(df_counties, directory, filename, file_format) # write data for states to file df_states = df.groupby( @@ -168,23 +213,106 @@ def get_divi_data(read_data=dd.defaultDict['read_data'], df_states.reset_index(inplace=True) df_states.sort_index(axis=1, inplace=True) - filename = "state_divi" - filename = gd.append_filename(filename, impute_dates, moving_average) - gd.write_dataframe(df_states, directory, filename, file_format) - # write data for germany to file df_ger = df.groupby(["Date"]).agg({"ICU": "sum", "ICU_ventilated": "sum"}) df_ger.reset_index(inplace=True) df_ger.sort_index(axis=1, inplace=True) - filename = "germany_divi" - filename = gd.append_filename(filename, impute_dates, moving_average) - gd.write_dataframe(df_ger, directory, filename, file_format) + if not conf_obj.to_dataset: + filename = "county_divi" + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_counties, directory, filename, file_format) + + filename = "state_divi" + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_states, directory, filename, file_format) + + filename = "germany_divi" + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_ger, directory, filename, file_format) + + data_dict = { + "counties": df_counties, + "states": df_states, + "Germany": df_ger + } + return data_dict + + +def get_divi_data(read_data: bool = dd.defaultDict['read_data'], + file_format: str = dd.defaultDict['file_format'], + out_folder: str = dd.defaultDict['out_folder'], + start_date: date = date(2020, 4, 24), + end_date: date = dd.defaultDict['end_date'], + impute_dates: bool = dd.defaultDict['impute_dates'], + moving_average: int = dd.defaultDict['moving_average'], + **kwargs + ): + """! Downloads or reads the DIVI ICU data and writes them in different files. + + Available data starts from 2020-04-24. + If the given start_date is earlier, it is changed to this date and a warning is printed. + If it does not already exist, the folder Germany is generated in the given out_folder. + If read_data == True and the file "FullData_DIVI.json" exists, the data is read form this file + and stored in a pandas dataframe. If read_data = True and the file does not exist the program is stopped. + + The downloaded dataframe is written to the file "FullData_DIVI". + After that, the columns are renamed to English and the state and county names are added. + Afterwards, three kinds of structuring of the data are done. + We obtain the chronological sequence of ICU and ICU_ventilated + stored in the files "county_divi".json", "state_divi.json" and "germany_divi.json" + for counties, states and whole Germany, respectively. + + @param read_data True or False. Defines if data is read from file or downloaded. Default defined in defaultDict. + @param file_format File format which is used for writing the data. Default defined in defaultDict. + @param out_folder Folder where data is written to. Default defined in defaultDict. + @param start_date Date of first date in dataframe. Default defined in defaultDict. + @param end_date Date of last date in dataframe. Default defined in defaultDict. + @param impute_dates True or False. Defines if values for dates without new information are imputed. Default defined in defaultDict. + @param moving_average Integers >=0. Applies an 'moving_average'-days moving average on all time series + to smooth out effects of irregular reporting. Default defined in defaultDict. + @param to_dataset bool True or False. Whether to return the dataframe as an object instead of json file. + If True - returns objects with dataframes + If False - write dataframes into files + Default defined in defaultDict. + """ + conf = gd.Conf(out_folder, **kwargs) + out_folder = conf.path_to_use - return (df_raw, df_counties, df_states, df_ger) + directory = os.path.join(out_folder, 'Germany/') + gd.check_dir(directory) + filename = "FullData_DIVI" -def divi_data_sanity_checks(df=pd.DataFrame()): + downloaded_data_df = fetch_divi_data( + directory=directory, + conf_obj=conf, + filename=filename, + read_data=read_data, + file_format=file_format, + ) + + preprocess_df, df_raw = preprocess_divi_data( + conf_obj=conf, + df_raw=downloaded_data_df, + start_date=start_date, + end_date=end_date, + impute_dates=impute_dates, + moving_average=moving_average, + ) + datasets = write_divi_data( + df=preprocess_df, + directory=directory, + file_format=file_format, + impute_dates=impute_dates, + moving_average=moving_average, + conf_obj=conf, + ) + datasets['raw_data'] = df_raw + return datasets + + +def divi_data_sanity_checks(df: pd.DataFrame) -> None: """! Checks the sanity of the divi_data dataframe Checks if type of the given data is a dataframe @@ -209,15 +337,16 @@ def divi_data_sanity_checks(df=pd.DataFrame()): # check if headers are those we want for name in test_strings: - if (name not in actual_strings_list): + if name not in actual_strings_list: raise gd.DataError("Error: Data categories have changed.") # check if size of dataframe is not unusal # data colletion starts at 24.04.2020 # TODO: Number of reporting counties get less with time. - # Maybe we should look for a new method to sanitize the size of the DataFrame. + # Maybe we should look for a new method to sanitize the size of the + # DataFrame. num_dates = (date.today() - date(2020, 4, 24)).days - min_num_data = 380*num_dates # not all 400 counties report every day - max_num_data = 400*num_dates + min_num_data = 380 * num_dates # not all 400 counties report every day + max_num_data = 400 * num_dates if (len(df) < min_num_data) or (len(df) > max_num_data): raise gd.DataError("Error: unexpected length of dataframe.") diff --git a/pycode/memilio-epidata/memilio/epidata/getDataIntoPandasDataFrame.py b/pycode/memilio-epidata/memilio/epidata/getDataIntoPandasDataFrame.py index c1df694825..02c5b9a6ef 100644 --- a/pycode/memilio-epidata/memilio/epidata/getDataIntoPandasDataFrame.py +++ b/pycode/memilio-epidata/memilio/epidata/getDataIntoPandasDataFrame.py @@ -84,7 +84,6 @@ def __init__(self, out_folder, **kwargs): # activate CoW for more predictable behaviour of pandas DataFrames pd.options.mode.copy_on_write = True - # read in config file # if no config file is given, use default values if os.path.exists(path): @@ -105,12 +104,17 @@ def __init__(self, out_folder, **kwargs): if key not in kwargs: kwargs.update({key: parser['SETTINGS'][key]}) - Conf.show_progr = True if kwargs['show_progress'] == 'True' else False + Conf.show_progr = True if str( + kwargs['show_progress']) == 'True' else False Conf.v_level = str(kwargs['verbosity_level']) - self.checks = True if kwargs['run_checks'] == 'True' else False - self.interactive = True if kwargs['interactive'] == 'True' else False - self.plot = True if kwargs['make_plot'] == 'True' else False - self.no_raw = True if kwargs['no_raw'] == 'True' else False + self.checks = True if str( + kwargs['run_checks']) == 'True' else False + self.interactive = True if str( + kwargs['interactive']) == 'True' else False + self.plot = True if str(kwargs['make_plot']) == 'True' else False + self.no_raw = True if str(kwargs['no_raw']) == 'True' else False + self.to_dataset = True if str( + kwargs['to_dataset']) == 'True' else False else: # default values: Conf.show_progr = kwargs['show_progress'] if 'show_progress' in kwargs.keys( @@ -126,6 +130,8 @@ def __init__(self, out_folder, **kwargs): self.no_raw = kwargs['no_raw'] if 'no_raw' in kwargs.keys( ) else dd.defaultDict['no_raw'] self.path_to_use = out_folder + self.to_dataset = kwargs['to_dataset'] if 'to_dataset' in kwargs.keys( + ) else False # suppress Future & DepricationWarnings if VerbosityLevel[Conf.v_level].value <= 2: @@ -354,6 +360,7 @@ def cli(what): - no_raw - username - password + - to_dataset @param what Defines what packages calls and thus what kind of command line arguments should be defined. """ @@ -493,6 +500,13 @@ def cli(what): parser.add_argument( '--password', type=str ) + if '--to-dataset' in sys.argv: + parser.add_argument( + '--to-dataset', dest='to_dataset', + help="To return saved dataframes as objects.", + action='store_true' + ) + args = vars(parser.parse_args()) return args diff --git a/pycode/memilio-epidata/memilio/epidata/getPopulationData.py b/pycode/memilio-epidata/memilio/epidata/getPopulationData.py index 9601fc7440..fa867918c6 100644 --- a/pycode/memilio-epidata/memilio/epidata/getPopulationData.py +++ b/pycode/memilio-epidata/memilio/epidata/getPopulationData.py @@ -42,7 +42,7 @@ def read_population_data(username, password): - '''! Reads Population data from regionalstatistik.de + """! Reads Population data from regionalstatistik.de Username and Password are required to sign in on regionalstatistik.de. A request is made to regionalstatistik.de and the StringIO is read in as a csv into the dataframe format. @@ -50,7 +50,7 @@ def read_population_data(username, password): @param username Username to sign in at regionalstatistik.de. @param password Password to sign in at regionalstatistik.de. @return DataFrame - ''' + """ download_url = 'https://www.regionalstatistik.de/genesis/online?operation=download&code=12411-02-03-4&option=csv' req = requests.get(download_url, auth=(username, password)) @@ -63,14 +63,14 @@ def read_population_data(username, password): def path_to_credential_file(): - '''Returns path to .ini file where credentials are stored. + """! Returns path to .ini file where credentials are stored. The Path can be changed if neccessary. - ''' + """ return os.path.join(os.path.dirname(os.path.abspath(__file__)), 'CredentialsRegio.ini') def manage_credentials(interactive): - '''! Manages credentials for regionalstatistik.de (needed for dowload). + """! Manages credentials for regionalstatistik.de (needed for dowload). A connfig file inside the epidata folder is either written (if not existent yet) with input from user or read with following format: @@ -79,7 +79,7 @@ def manage_credentials(interactive): Password = XXXXX @return Username and password to sign in at regionalstatistik.de. - ''' + """ # path where ini file is found path = path_to_credential_file() @@ -118,8 +118,8 @@ def manage_credentials(interactive): return username, password -def export_population_dataframe(df_pop, directory, file_format, merge_eisenach): - '''! Writes population dataframe into directory with new column names and age groups +def export_population_dataframe(df_pop: pd.DataFrame, directory: str, file_format: str, merge_eisenach: bool): + """! Writes population dataframe into directory with new column names and age groups @param df_pop Population data DataFrame to be exported @param directory Directory where data is written to. @@ -128,7 +128,7 @@ def export_population_dataframe(df_pop, directory, file_format, merge_eisenach): and 'Eisenach' are listed separately or combined as one entity 'Wartburgkreis'. @return exported DataFrame - ''' + """ new_cols = [ dd.EngEng['idCounty'], @@ -194,7 +194,7 @@ def export_population_dataframe(df_pop, directory, file_format, merge_eisenach): def assign_population_data(df_pop_raw, counties, age_cols, idCounty_idx): - '''! Assigns population data of all counties of old dataframe in new created dataframe + """! Assigns population data of all counties of old dataframe in new created dataframe In df_pop_raw there might be additional information like federal states, governing regions etc. which is not necessary for the dataframe. @@ -205,7 +205,7 @@ def assign_population_data(df_pop_raw, counties, age_cols, idCounty_idx): @param age_cols Age groups in old DataFrame @param idCountyidx indexes in old DataFrame where data of corresponding county starts @return new DataFrame - ''' + """ new_cols = {dd.EngEng['idCounty']: counties[:, 1], dd.EngEng['county']: counties[:, 0]} @@ -283,45 +283,25 @@ def test_total_population(df_pop, age_cols): raise gd.DataError('Total Population does not match expectation.') -def get_population_data(read_data=dd.defaultDict['read_data'], - file_format=dd.defaultDict['file_format'], - out_folder=dd.defaultDict['out_folder'], - merge_eisenach=True, - username='', - password='', - **kwargs): - """! Download age-stratified population data for the German counties. - - The data we use is: - Official 'Bevölkerungsfortschreibung' 12411-02-03-4: - 'Bevölkerung nach Geschlecht und Altersgruppen (17)' - of regionalstatistik.de. - ATTENTION: The raw file cannot be downloaded - automatically by our scripts without an Genesis Online account. In order to - work on this dataset, please enter your username and password or manually download it from: - - https://www.regionalstatistik.de/genesis/online -> "1: Gebiet, Bevölkerung, - Arbeitsmarkt, Wahlen" -> "12: Bevölkerung" -> "12411 Fortschreibung des - Bevölkerungsstandes" -> "12411-02-03-4: Bevölkerung nach Geschlecht und - Altersgruppen (17) - Stichtag 31.12. - regionale Tiefe: Kreise und - krfr. Städte". - - Download the xlsx or csv file and put it under dd.defaultDict['out_folder'], - this normally is Memilio/data/pydata/Germany. - The folders 'pydata/Germany' have to be created if they do not exist yet. - Then this script can be run. +def fetch_population_data(read_data: bool = dd.defaultDict['read_data'], + out_folder: str = dd.defaultDict['out_folder'], + username='', + password='', + **kwargs + ) -> pd.DataFrame: + """! Downloads or reads the population data. + If it does not already exist, the folder Germany is generated in the given out_folder. + If read_data == True and the file "FullData_population.json" exists, the data is read form this file + and stored in a pandas dataframe. If read_data = True and the file does not exist the program is stopped. + The downloaded dataframe is written to the file "FullData_population". @param read_data False or True. Defines if data is read from file or downloaded. Default defined in defaultDict. - @param file_format File format which is used for writing the data. - Default defined in defaultDict. @param out_folder Path to folder where data is written in folder out_folder/Germany. Default defined in defaultDict. - @param merge_eisenach [Default: True] or False. Defines whether the - counties 'Wartburgkreis' and 'Eisenach' are listed separately or - combined as one entity 'Wartburgkreis'. - @param username Username to sign in at regionalstatistik.de. + @param username Username to sign in at regionalstatistik.de. @param password Password to sign in at regionalstatistik.de. + @return DataFrame with adjusted population data for all ages to current level. """ conf = gd.Conf(out_folder, **kwargs) @@ -341,6 +321,22 @@ def get_population_data(read_data=dd.defaultDict['read_data'], df_pop_raw = read_population_data(username, password) + return df_pop_raw + + +def preprocess_population_data(df_pop_raw: pd.DataFrame, + merge_eisenach: bool = True, + ) -> pd.DataFrame: + """! Processing of the downloaded data + * the columns are renamed to English and the state and county names are added. + + @param df_pop_raw pd.DataFrame. A Dataframe containing input population data + @param merge_eisenach [Default: True] or False. Defines whether the + counties 'Wartburgkreis' and 'Eisenach' are listed separately or + combined as one entity 'Wartburgkreis'. + + @return df pd.DataFrame. Processed population data + """ column_names = list(df_pop_raw.columns) # rename columns rename_columns = { @@ -381,12 +377,96 @@ def get_population_data(read_data=dd.defaultDict['read_data'], df_pop = assign_population_data( df_pop_raw, counties, age_cols, idCounty_idx) - test_total_population(df_pop, age_cols) + return df_pop + +def write_population_data(df_pop: pd.DataFrame, + out_folder: str = dd.defaultDict['out_folder'], + file_format: str = dd.defaultDict['file_format'], + merge_eisenach: bool = True + ) -> None or pd.DataFrame: + """! Write the population data into json files + Three kinds of structuring of the data are done. + We obtain the chronological sequence of ICU and ICU_ventilated + stored in the files "county_population".json", "state_population.json" and "germany_population.json" + for counties, states and whole Germany, respectively. + + @param df_pop pd.DataFrame. A Dataframe containing processed population data + @param file_format str. File format which is used for writing the data. Default defined in defaultDict. + @param out_folder str. Folder where data is written to. Default defined in defaultDict. + @param merge_eisenach [Default: True] or False. Defines whether the + counties 'Wartburgkreis' and 'Eisenach' are listed separately or + combined as one entity 'Wartburgkreis'. + + @return None + """ + directory = os.path.join(out_folder, 'Germany') df_pop_export = export_population_dataframe( df_pop, directory, file_format, merge_eisenach) + return df_pop_export + + +def get_population_data(read_data: bool = dd.defaultDict['read_data'], + file_format: str = dd.defaultDict['file_format'], + out_folder: str = dd.defaultDict['out_folder'], + merge_eisenach: bool = True, + username='', + password='', + **kwargs + ): + """! Download age-stratified population data for the German counties. + The data we use is: + Official 'Bevölkerungsfortschreibung' 12411-02-03-4: + 'Bevölkerung nach Geschlecht und Altersgruppen (17)' + of regionalstatistik.de. + ATTENTION: The raw file cannot be downloaded + automatically by our scripts without an Genesis Online account. In order to + work on this dataset, please enter your username and password or manually download it from: + + https://www.regionalstatistik.de/genesis/online -> "1: Gebiet, Bevölkerung, + Arbeitsmarkt, Wahlen" -> "12: Bevölkerung" -> "12411 Fortschreibung des + Bevölkerungsstandes" -> "12411-02-03-4: Bevölkerung nach Geschlecht und + Altersgruppen (17) - Stichtag 31.12. - regionale Tiefe: Kreise und + krfr. Städte". + + Download the xlsx or csv file and put it under dd.defaultDict['out_folder'], + this normally is Memilio/data/pydata/Germany. + The folders 'pydata/Germany' have to be created if they do not exist yet. + Then this script can be run. + + @param read_data False or True. Defines if data is read from file or + downloaded. Default defined in defaultDict. + @param file_format File format which is used for writing the data. + Default defined in defaultDict. + @param out_folder Path to folder where data is written in folder + out_folder/Germany. Default defined in defaultDict. + @param merge_eisenach [Default: True] or False. Defines whether the + counties 'Wartburgkreis' and 'Eisenach' are listed separately or + combined as one entity 'Wartburgkreis'. + @param username str. Username to sign in at regionalstatistik.de. + @param password str. Password to sign in at regionalstatistik.de. + @return DataFrame with adjusted population data for all ages to current level. + """ + raw_df = fetch_population_data( + read_data=read_data, + out_folder=out_folder, + file_format=file_format, + username=username, + password=password, + **kwargs + ) + preprocess_df = preprocess_population_data( + df_pop_raw=raw_df, + merge_eisenach=merge_eisenach + ) + df_pop_export = write_population_data( + df_pop=preprocess_df, + file_format=file_format, + out_folder=out_folder, + merge_eisenach=True + ) return df_pop_export diff --git a/pycode/memilio-epidata/memilio/epidata/getVaccinationData.py b/pycode/memilio-epidata/memilio/epidata/getVaccinationData.py index 8299194ebe..5812970775 100644 --- a/pycode/memilio-epidata/memilio/epidata/getVaccinationData.py +++ b/pycode/memilio-epidata/memilio/epidata/getVaccinationData.py @@ -19,7 +19,8 @@ ############################################################################# import itertools import os -from datetime import datetime +from datetime import datetime, date +from typing import Tuple import numpy as np import pandas as pd @@ -38,11 +39,11 @@ def download_vaccination_data(read_data, filename, directory, interactive): - url = "https://raw.githubusercontent.com/robert-koch-institut/COVID-19-Impfungen_in_Deutschland/master/Deutschland_Landkreise_COVID-19-Impfungen.csv" path = os.path.join(directory + filename + ".json") df_data = gd.get_file(path, url, read_data, param_dict={'dtype': { - 'LandkreisId_Impfort': "string", 'Altersgruppe': "string", 'Impfschutz': int, 'Anzahl': int}}, interactive=interactive) + 'LandkreisId_Impfort': "string", 'Altersgruppe': "string", 'Impfschutz': int, 'Anzahl': int}}, + interactive=interactive) return df_data @@ -70,7 +71,7 @@ def sanity_checks(df): def compute_vaccination_ratios( age_group_list, vaccinations_table, vacc_column, region_column, population, merge_2022=True): - """! Computes vaccination ratios based on the number of vaccinations + """! Computes vaccination ratios based on the number of vaccinations and the corresponding population data @param age_group_list List of age groups considered. @@ -94,7 +95,7 @@ def compute_vaccination_ratios( population = geoger.merge_df_counties_all( population, sorting=[region_column], columns=region_column) - df_vacc_ratios[['r'+age for age in age_group_list] + df_vacc_ratios[['r' + age for age in age_group_list] ] = df_vacc_ratios[age_group_list] / population[age_group_list].values return df_vacc_ratios @@ -102,7 +103,7 @@ def compute_vaccination_ratios( def sanitizing_average_regions( df, to_county_map, age_groups, column_names, age_population): - """! Vaccinations in all regions are split up per population of its counties. + """! Vaccinations in all regions are split up per population of its counties. This is done by summing up all vaccinations in this region and divide this by the population ratios. This is done for every age group and number of vaccination seperately. A new dataframme is created where the new data is stored. @@ -143,7 +144,7 @@ def sanitizing_average_regions( # for each column: vaccinations = all vaccinations * population_ratios for column in column_names: df_age.loc[df_age[dd.EngEng['idCounty']].isin( - counties_list), column] = vacc_sums[column].values*population_ratios.values + counties_list), column] = vacc_sums[column].values * population_ratios.values df_total.append(df_age) @@ -172,12 +173,12 @@ def sanitizing_extrapolation_mobility( # compute average vaccination ratio per age group for full vaccinations aver_ratio = df.groupby(dd.EngEng['ageRKI']).agg({column_names[1]: "sum"})[ - column_names[1]].values/age_population[age_groups].sum().values + column_names[1]].values / age_population[age_groups].sum().values # compute maximum_sanitizing threshold per age group as maxmimum of country-wide ratio + 10% # and predefined maximum value; threshold from becoming larger than 1 for kk in range(len(age_groups)): - max_sanit_threshold_arr[kk] = min(1, aver_ratio[kk]+0.1) + max_sanit_threshold_arr[kk] = min(1, aver_ratio[kk] + 0.1) # create copy of dataframe df_san = df[:] @@ -185,7 +186,7 @@ def sanitizing_extrapolation_mobility( # aggregate total number of vaccinations per county and age group vacc_sums_nonsanit = df.groupby( [dd.EngEng['idCounty'], - dd.EngEng['ageRKI']]).agg( + dd.EngEng['ageRKI']]).agg( {column_names[1]: "sum"}).reset_index() # create new data frame and reshape it df_fullsum = compute_vaccination_ratios( @@ -217,18 +218,19 @@ def sanitizing_extrapolation_mobility( # federal state and age-group-specific sanitizing threshold minus # current vaccination ratio df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( - state_to_county[key]), ['cw'+age for age in age_groups]] = sanitizing_thresholds[stateidx] - df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( - state_to_county[key]), ['r'+age for age in age_groups]].values + state_to_county[key]), ['cw' + age for age in age_groups]] = sanitizing_thresholds[stateidx] - df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( + state_to_county[key]), ['r' + age for age in age_groups]].values # replace negative numbers by zero, i.e., take maximum of 0 and value df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( - state_to_county[key]), ['cw'+age for age in age_groups]] = df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( - state_to_county[key]), ['cw'+age for age in age_groups]].mask(df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( - state_to_county[key]), ['cw'+age for age in age_groups]] < 0, 0) + state_to_county[key]), ['cw' + age for age in age_groups]] = df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( + state_to_county[key]), ['cw' + age for age in age_groups]].mask( + df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( + state_to_county[key]), ['cw' + age for age in age_groups]] < 0, 0) # compute equally the vaccination amount that is considered to be # distributed (or that can be accepted) df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( - state_to_county[key]), ['vd'+age for age in age_groups]] = df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( + state_to_county[key]), ['vd' + age for age in age_groups]] = df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( state_to_county[key]), [age for age in age_groups]].values - sanitizing_thresholds[stateidx] * age_population.loc[age_population[dd.EngEng['idCounty']].isin( state_to_county[key]), age_groups].values @@ -251,21 +253,22 @@ def sanitizing_extrapolation_mobility( # vaccinations: access only rows which belong to neighboring # counties cap_weight = df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin( - neighbors_mobility[id][0]), ['cw'+age for age in age_groups]] + neighbors_mobility[id][0]), ['cw' + age for age in age_groups]] # iterate over age groups for ageidx in range(len(age_groups)): # check if vaccinations have to be distributed if vacc_dist[ageidx] > 1e-10: - cap_chck = np.zeros(len(neighbors_mobility[id][0]))-1 + cap_chck = np.zeros(len(neighbors_mobility[id][0])) - 1 chk_err_idx = 0 while (chk_err_idx == 0) or (len(np.where(cap_chck > 1e-10)[0]) > 0): neighb_cap_reached = np.where(cap_chck > -1e-10)[0] neighb_open = np.where(cap_chck <= -1e-10)[0] # maximum the neighbor takes before exceeding # the average - vacc_nshare_max = df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin(neighbors_mobility[id][0]), [ - 'vd' + age_groups[ageidx]]].reset_index().loc[:, 'vd' + age_groups[ageidx]].values + vacc_nshare_max = df_fullsum.loc[ + df_fullsum[dd.EngEng['idCounty']].isin(neighbors_mobility[id][0]), [ + 'vd' + age_groups[ageidx]]].reset_index().loc[:, 'vd' + age_groups[ageidx]].values vacc_nshare_max[vacc_nshare_max > 0] = 0 # multiply capacity weight with commuter mobility weight and divide # by sum of single products such that the sum of these weights @@ -279,8 +282,9 @@ def sanitizing_extrapolation_mobility( vacc_dist_weight[neighb_cap_reached] = abs( vacc_nshare_max[neighb_cap_reached] / vacc_dist[ageidx]) # 3th step: compute initial weights for all other counties - vacc_dist_weight[neighb_open] = neighbors_mobility[id][1][neighb_open] * cap_weight.values[neighb_open, - ageidx] / sum(neighbors_mobility[id][1][neighb_open] * cap_weight.values[neighb_open, ageidx]) + vacc_dist_weight[neighb_open] = neighbors_mobility[id][1][neighb_open] * cap_weight.values[ + neighb_open, + ageidx] / sum(neighbors_mobility[id][1][neighb_open] * cap_weight.values[neighb_open, ageidx]) # 4th step: scale according to non-distributed vaccinations vacc_dist_weight[neighb_open] = ( 1 - sum(vacc_dist_weight[neighb_cap_reached])) * vacc_dist_weight[neighb_open] @@ -293,7 +297,7 @@ def sanitizing_extrapolation_mobility( vacc_nshare_pot_denom = vacc_nshare_pot.copy() vacc_nshare_pot_denom[vacc_nshare_pot_denom == 0] = 1 cap_chck = ( - vacc_nshare_max + vacc_nshare_pot)/vacc_nshare_pot_denom + vacc_nshare_max + vacc_nshare_pot) / vacc_nshare_pot_denom chk_err_idx += 1 if chk_err_idx > len(neighbors_mobility[id][0]): @@ -315,30 +319,31 @@ def sanitizing_extrapolation_mobility( # sum up new additions df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin(neighbors_mobility[id][0]), [ - 'vd' + age_groups[ageidx]]] = np.array(df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin(neighbors_mobility[id][0]), [ - 'vd' + age_groups[ageidx]]]).flatten() + vacc_dist[ageidx]*vacc_dist_weight + 'vd' + age_groups[ageidx]]] = np.array( + df_fullsum.loc[df_fullsum[dd.EngEng['idCounty']].isin(neighbors_mobility[id][0]), [ + 'vd' + age_groups[ageidx]]]).flatten() + vacc_dist[ageidx] * vacc_dist_weight # iterate over neighbors and add potential share of vaccionations # to neighbor and decrease local vaccionations at the end nidx = 0 for neighb_id in neighbors_mobility[id][0][order_dist][ np.where(vacc_nshare_pot > 0)]: - df_san.loc[(df_san[dd.EngEng['idCounty']] == neighb_id) & - (df_san[dd.EngEng['ageRKI']] == - age_groups[ageidx]), - column_names] += reduc_shares[nidx] * df_san.loc[( - df_san[dd.EngEng['idCounty']] == id) & - (df_san[dd.EngEng['ageRKI']] == age_groups[ageidx]), - column_names].values + df_san.loc[ + (df_san[dd.EngEng['idCounty']] == neighb_id) + & (df_san[dd.EngEng['ageRKI']] == age_groups[ageidx]), + column_names] += (reduc_shares[nidx] + * df_san.loc[(df_san[dd.EngEng['idCounty']] == id) + & (df_san[dd.EngEng['ageRKI']] == age_groups[ageidx]), column_names] + .values + ) nidx += 1 - df_san.loc[(df_san[dd.EngEng['idCounty']] == id) & - (df_san[dd.EngEng['ageRKI']] == - age_groups[ageidx]), - column_names] -= sum(reduc_shares) * df_san.loc[( - df_san[dd.EngEng['idCounty']] == id) & - (df_san[dd.EngEng['ageRKI']] == - age_groups[ageidx]), - column_names].values + df_san.loc[(df_san[dd.EngEng['idCounty']] == id) + & (df_san[dd.EngEng['ageRKI']] == age_groups[ageidx]), + column_names] -= (sum(reduc_shares) + * df_san.loc[(df_san[dd.EngEng['idCounty']] == id) + & (df_san[dd.EngEng['ageRKI']] == age_groups[ageidx]), column_names] + .values + ) if len( np.where(np.isnan(df_san[column_names]) == True)[0]) > 0: @@ -366,7 +371,7 @@ def sanitizing_extrapolation_mobility( df_san.Age_RKI == age)][column_names].sum() b = df[(df.ID_County == id) & ( df.Age_RKI == age)].loc[:, column_names].iloc[-1] - if sum(a-b) > 1e-8: + if sum(a - b) > 1e-8: gd.default_print( "Error", "Cumulative sum error in: " + str(id) + " " + str(age)) ### end of to be removed ### @@ -378,11 +383,12 @@ def extrapolate_age_groups_vaccinations( df_data, population_all_ages, unique_age_groups_old, unique_age_groups_new, column_names, age_old_to_all_ages_indices, min_all_ages, all_ages_to_age_new_share): - """! Original age groups (05-11, 12-17, 18-59, 60+) are replaced by infection data age groups - (0-4, 5-14, 15-34, 35-59, 60-79, 80+). For every county the vacinations of old age groups are split to infection + """! Original age groups (05-11, 12-17, 18-59, 60+) are replaced by infection data age groups + (0-4, 5-14, 15-34, 35-59, 60-79, 80+). For every county the vacinations of old age groups are split to infection data age groups by its population ratio. For every age group and county a new dataframe is created. After the extrapolation all subframes are merged together. + @param age_old_to_all_ages_indices List. List of original ages @param df_data DataFrame with Data to compute. @param population_all_ages Dataframe with number of population for every age group and county. @param unique_age_groups_old List of original age groups. @@ -429,10 +435,10 @@ def extrapolate_age_groups_vaccinations( ratios = [0 for zz in range(0, len(unique_age_groups_new))] for j in age_old_to_all_ages_indices[i]: ratios[all_ages_to_age_new_share[j][0][1] - ] += float(pop_state[str(min_all_ages[j])].iloc[0])/total_pop + ] += float(pop_state[str(min_all_ages[j])].iloc[0]) / total_pop # split vaccinations in old agegroup to new agegroups for j in range(0, len(ratios)): - new_dataframe = county_age_df[column_names]*ratios[j] + new_dataframe = county_age_df[column_names] * ratios[j] new_dataframe[dd.EngEng['ageRKI']] = unique_age_groups_new[j] vacc_data_df.append(pd.concat( [info_df, new_dataframe], @@ -450,98 +456,92 @@ def extrapolate_age_groups_vaccinations( # test if number of vaccinations in current county are equal in old and new dataframe for random chosen date for vacc in column_names: if total_county_df[total_county_df[dd.EngEng['date']] == '2022-05-10'][vacc].sum() - vacc_df[vacc_df[dd.EngEng['date']] == '2022-05-10'][vacc].sum() > 1e-5: - gd.default_print("Error", - "Error in transformation...") + gd.default_print("Error", "Error in transformation...") # merge all county specific dataframes df_data_ageinf_county_cs = pd.concat(df_data_ageinf_county_cs) return df_data_ageinf_county_cs -# gets rki vaccination monitoring data for all states and extrapolates the values for counties according to their population -# Missing ratio values for the two different age groups are also estimated +# gets rki vaccination monitoring data for all states and extrapolates the values for counties according to their +# population Missing ratio values for the two different age groups are also estimated -def get_vaccination_data(read_data=dd.defaultDict['read_data'], - file_format=dd.defaultDict['file_format'], - out_folder=dd.defaultDict['out_folder'], - start_date=dd.defaultDict['start_date'], - end_date=dd.defaultDict['end_date'], - impute_dates=True, - moving_average=dd.defaultDict['moving_average'], - sanitize_data=dd.defaultDict['sanitize_data'], - **kwargs - ): - """! Downloads the RKI vaccination data and provides different kind of structured data. +def fetch_vaccination_data( + conf_obj, + filename: str, + directory: str, + read_data: str = dd.defaultDict['read_data'], +) -> pd.DataFrame: + """ Downloads or reads the vaccination data and writes the RKIVaccFull dataset - The data is read from the internet. - The file is read in or stored at the folder "out_folder"/Germany/. - To store and change the data we use pandas. + @param directory: str + Path to the output directory + @param filename: str + Name of the full dataset filename + @param conf_obj + configuration object + @param read_data bool True or False. Defines if data is read from file or downloaded. Default defined in defaultDict. - While working with the data - - the column names are changed to English depending on defaultDict - - The column "Date" provides information on the date of each data point given in the corresponding columns. + @return pd.DataFrame fetched vaccination data + """ + out_folder = conf_obj.path_to_use + no_raw = conf_obj.no_raw - - The data is exported in three different ways: - - all_county_vacc: Resolved per county by grouping all original age groups (05-11, 12-17, 18-59, 60+) - - all_county_agevacc_vacc: Resolved per county and original age group (05-11, 12-17, 18-59, 60+) - - all_county_ageinf_vacc: Resolved per county and infection data age group (0-4, 5-14, 15-34, 35-59, 60-79, 80+) - - To do so getPopulationData is used and age group specific date from the original source - is extrapolated on the new age groups on county level. + df_data = download_vaccination_data( + read_data, filename, directory, conf_obj.interactive) + if conf_obj.checks: + sanity_checks(df_data) - - Missing dates are imputed for all data frames ('fillDates' is not optional but always executed). - - A central moving average of N days is optional. + if not no_raw: + gd.write_dataframe(df_data, directory, filename, "json") + return df_data - - Start and end dates can be provided to define the length of the returned data frames. - @param read_data [Currently not used] True or False. Defines if data is read from file or downloaded. - Here Data is always downloaded from the internet. - @param file_format File format which is used for writing the data. Default defined in defaultDict. - @param out_folder Folder where data is written to. Default defined in defaultDict. +def process_vaccination_data( + df_data: pd.DataFrame, + conf_obj, + directory: str, + file_format: str = dd.defaultDict['file_format'], + start_date: date = dd.defaultDict['start_date'], + end_date: date = dd.defaultDict['end_date'], + moving_average: int = dd.defaultDict['moving_average'], + sanitize_data: int = dd.defaultDict['sanitize_data'] +) -> dict: + """! Processes downloaded raw data + While working with the data + - the column names are changed to English depending on defaultDict + - The column "Date" provides information on the date of each data point given in the corresponding columns. + + @param df_data pd.DataFrame a Dataframe containing processed vaccination data + @param directory: str + Path to the output directory + @param conf_obj + configuration object + @param file_format str. File format which is used for writing the data. Default defined in defaultDict. @param start_date Date of first date in dataframe. Default defined in defaultDict. @param end_date Date of last date in dataframe. Default defined in defaultDict. - @param impute_dates True or False. Defines if values for dates without new information are imputed. - Here Dates are always imputed so False changes nothing. - @param moving_average Integers >=0. Applies an 'moving_average'-days moving average on all time series + @param moving_average int. Integers >=0. Applies an 'moving_average'-days moving average on all time series to smooth out effects of irregular reporting. Default defined in defaultDict. - @param sanitize_data Value in {0,1,2,3}; Default: 1. For many counties, - vaccination data is not correctly attributed to home locations of - vaccinated persons. If 'sanitize_data' is set to larger 0, this is - corrected. + @param sanitize_data int. Value in {0,1,2,3}; Default: 1. For many counties, + vaccination data is not correctly attributed to home locations of + vaccinated persons. If 'sanitize_data' is set to larger 0, this is corrected. 0: No sanitizing applied. 1: Averaging ratios over federal states. 2: Averaging ratios over intermediate regions. - 3: All counties with vaccination quotas of more than - 'sanitizing_threshold' will be adjusted to the average of its + 3: All counties with vaccination quotas of + more than 'sanitizing_threshold' will be adjusted to the average of its federal state and remaining vaccinations will be distributed to closely connected neighboring regions using commuter mobility networks. The sanitizing threshold will be defined by the age group-specific average on the corresponding vaccination ratios on county and federal - state level. - """ - conf = gd.Conf(out_folder, **kwargs) - out_folder = conf.path_to_use - no_raw = conf.no_raw - - # data for all dates is automatically added - if impute_dates == False: - gd.default_print( - 'Warning', 'Setting impute_dates = True as data for all dates is automatically added.') - impute_dates = True - - directory = os.path.join(out_folder, 'Germany/') - gd.check_dir(directory) - - filename = "RKIVaccFull" - - df_data = download_vaccination_data( - read_data, filename, directory, conf.interactive) + state level. - if conf.checks: - sanity_checks(df_data) + @return tuple and DataFrame + """ + out_folder = conf_obj.path_to_use + no_raw = conf_obj.no_raw - if not no_raw: - gd.write_dataframe(df_data, directory, filename, "json") with progress_indicator.Spinner(message='Preparing DataFrame'): df_data.rename(dd.GerEng, axis=1, inplace=True) @@ -583,14 +583,13 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], ] = df_data[dd.EngEng['idCounty']].astype(int) except ValueError: gd.default_print("Error", 'Data items in ID_County could not be converted to integer. ' - 'Imputation and/or moving_average computation will FAIL.') + 'Imputation and/or moving_average computation will FAIL.') # NOTE: the RKI vaccination table contains about # 180k 'complete' vaccinations in id 17000 Bundesressorts, which # can not be attributed to any county, so these are currently ignored! # for spatially resolved data, we remove and ignore it. df_data = df_data[df_data[dd.EngEng['idCounty']] != 17000] - # get unique age groups unique_age_groups_old = sorted(df_data[dd.EngEng['ageRKI']].unique()) @@ -615,6 +614,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], min_age_old.append(max_age_all) # get population data for all countys (TODO: better to provide a corresponding method for the following lines in getPopulationData itself) + try: population = pd.read_json( directory + "county_current_population.json") @@ -634,7 +634,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], if '-' in age: min_age_pop.append(int(age.split('-')[0])) elif '>' in age: - min_age_pop.append(int(age.split('>')[1])+1) + min_age_pop.append(int(age.split('>')[1]) + 1) elif '<' in age: min_age_pop.append(0) else: @@ -670,7 +670,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], # get interval indices from all age groups that correspond to old age group age_old_to_all_ages_indices = [[] - for zz in range(0, len(min_age_old)-1)] + for zz in range(0, len(min_age_old) - 1)] for i in range(0, len(unique_age_groups_old)): for k in range(0, len(all_ages_to_age_old_share)): if all_ages_to_age_old_share[k][0][1] == i + old_age_not_vacc: @@ -681,7 +681,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], # get interval indices from all age groups that correspond to new age group age_new_to_all_ages_indices = [[] - for zz in range(0, len(min_age_new)-1)] + for zz in range(0, len(min_age_new) - 1)] for i in range(0, len(min_age_new)): for k in range(0, len(all_ages_to_age_new_share)): if all_ages_to_age_new_share[k][0][1] == i: @@ -713,10 +713,10 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], # TODO: a similar functionality has to be implemented as unit test if max( - abs( - population[unique_age_groups_pop].sum(axis=1) - - population_all_ages[[str(i) for i in min_all_ages]].sum( - axis=1))) > 1e-8: + abs( + population[unique_age_groups_pop].sum(axis=1) - + population_all_ages[[str(i) for i in min_all_ages]].sum( + axis=1))) > 1e-8: gd.default_print("Error", "Population does not match expectations") population_old_ages = pd.DataFrame(population[dd.EngEng['idCounty']]) @@ -725,7 +725,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], # is in first place start_age_data = list(population_all_ages.columns).index('0') population_old_ages[unique_age_groups_old[i]] = population_all_ages.iloc[:, np.array( - age_old_to_all_ages_indices[i])+start_age_data].sum(axis=1) + age_old_to_all_ages_indices[i]) + start_age_data].sum(axis=1) ## only for output meta information purposes ## # create hashes to access columns of new age group intervals @@ -738,7 +738,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], # is in first place start_age_data = list(population_all_ages.columns).index('0') population_new_ages[unique_age_groups_new[i]] = population_all_ages.iloc[:, np.array( - age_new_to_all_ages_indices[i])+start_age_data].sum(axis=1) + age_new_to_all_ages_indices[i]) + start_age_data].sum(axis=1) # end of output meta information purposes vacc_column_names = [ @@ -849,7 +849,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], df_data_agevacc_county_cs = mdfs.impute_and_reduce_df( df_data_agevacc_county_cs, {dd.EngEng['idCounty']: df_data_agevacc_county_cs[dd.EngEng['idCounty']].unique(), - dd.EngEng['ageRKI']: unique_age_groups_old}, + dd.EngEng['ageRKI']: unique_age_groups_old}, vacc_column_names, impute='forward', moving_average=moving_average, min_date=start_date, max_date=end_date) @@ -857,11 +857,98 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], spinner.stop() gd.default_print('Info', 'Sanitizing deactivated.') - if conf.plot: + dict_of_data = { + "df_data_agevacc_county_cs": df_data_agevacc_county_cs, + "vacc_column_names": vacc_column_names, + "unique_age_groups_old": unique_age_groups_old, + "population_old_ages": population_old_ages, + "extrapolate_agegroups": extrapolate_agegroups, + "population_all_ages": population_all_ages, + "unique_age_groups_new": unique_age_groups_new, + "age_old_to_all_ages_indices": age_old_to_all_ages_indices, + "population_new_ages": population_new_ages, + "all_ages_to_age_new_share": all_ages_to_age_new_share, + "min_all_ages": min_all_ages, + } + return dict_of_data + + +def write_vaccination_data(dict_data: dict, + conf_obj, + directory: str, + file_format: str = dd.defaultDict['file_format'], + impute_dates: bool = True, + moving_average: int = dd.defaultDict['moving_average'], + ) -> None or Tuple: + """! Writes the vaccination data + The data is exported in three different ways: + - all_county_vacc: Resolved per county by grouping all original age groups (05-11, 12-17, 18-59, 60+) + - all_county_agevacc_vacc: Resolved per county and original age group (05-11, 12-17, 18-59, 60+) + - all_county_ageinf_vacc: Resolved per county and infection data age group (0-4, 5-14, 15-34, 35-59, 60-79, 80+) + - To do so getPopulationData is used and age group specific date from the original source + is extrapolated on the new age groups on county level. + + - Missing dates are imputed for all data frames ('fillDates' is not optional but always executed). + - A central moving average of N days is optional. + + - Start and end dates can be provided to define the length of the returned data frames. + Parameters + ---------- + @param dict_data dict. Contains various datasets or values + - df_data_agevacc_county_cs: pd.DataFrame a Dataframe containing processed vaccination data + - vacc_column_names + - unique_age_groups_old + - population_old_ages + - extrapolate_agegroups + - population_all_ages + - unique_age_groups_new + - age_old_to_all_ages_indices + - min_all_ages + - all_ages_to_age_new_share + - population_new_ages + + @param directory: str + Path to the output directory + @param conf_obj + configuration object + @param file_format: str. File format which is used for writing the data. Default defined in defaultDict. + @param impute_dates: bool. True or False. Defines if values for dates without new information are imputed. Default defined in defaultDict. + @param moving_average: int. Integers >=0. Applies an 'moving_average'-days moving average on all time series to smooth out effects of irregular reporting. Default defined in defaultDict. + sanitize_data: Value in {0,1,2,3}; Default: 1. For many counties, vaccination data is not correctly attributed to home locations of + vaccinated persons. If 'sanitize_data' is set to larger 0, this is corrected. + 0: No sanitizing applied. + 1: Averaging ratios over federal states. + 2: Averaging ratios over intermediate regions. + 3: All counties with vaccination quotas of more than 'sanitizing_threshold' will be adjusted to the average of its + federal state and remaining vaccinations will be distributed to closely connected neighboring regions using commuter mobility networks. + The sanitizing threshold will be defined by the age group-specific average on the corresponding vaccination ratios on county and federal + state level. + @return: none + """ + + df_data_agevacc_county_cs = dict_data["df_data_agevacc_county_cs"] + vacc_column_names = dict_data["vacc_column_names"] + unique_age_groups_old = dict_data["unique_age_groups_old"] + population_old_ages = dict_data["population_old_ages"] + extrapolate_agegroups = dict_data["extrapolate_agegroups"] + population_all_ages = dict_data["population_all_ages"] + unique_age_groups_new = dict_data["unique_age_groups_new"] + age_old_to_all_ages_indices = dict_data["age_old_to_all_ages_indices"] + min_all_ages = dict_data["min_all_ages"] + all_ages_to_age_new_share = dict_data["all_ages_to_age_new_share"] + population_new_ages = dict_data["population_new_ages"] + + # data for all dates is automatically added + if not impute_dates: + gd.default_print( + 'Warning', 'Setting impute_dates = True as data for all dates is automatically added.') + impute_dates = True + + if conf_obj.plot: # have a look extrapolated vaccination ratios (TODO: create plotting for production) # aggregate total number of vaccinations per county and age group latest_date = df_data_agevacc_county_cs[dd.EngEng["date"]][len( - df_data_agevacc_county_cs.index)-1].strftime("%Y-%m-%d") + df_data_agevacc_county_cs.index) - 1].strftime("%Y-%m-%d") vacc_sums_nonsanit = df_data_agevacc_county_cs.loc[( df_data_agevacc_county_cs.Date == latest_date), ['ID_County', vacc_column_names[1]]] # create new data frame and reshape it @@ -870,26 +957,14 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], dd.EngEng['idCounty'], population_old_ages, merge_2022=True) - # store data for all counties - filename = 'vacc_county_agevacc' - filename = gd.append_filename(filename, impute_dates, moving_average) - gd.write_dataframe(df_data_agevacc_county_cs, - directory, filename, file_format) - - # store data for all federal states: group information on date, state and age level - # (i.e., aggregate information of all counties per federal state) - filename = 'vacc_states_agevacc' - filename = gd.append_filename(filename, impute_dates, moving_average) df_data_agevacc_state_cs = df_data_agevacc_county_cs.groupby( [dd.EngEng['date'], dd.EngEng['idState'], dd.EngEng['ageRKI']]).agg( {column: "sum" for column in vacc_column_names}).reset_index() - gd.write_dataframe(df_data_agevacc_state_cs, - directory, filename, file_format) # make plot of absolute numbers original age resolution - if conf.plot: + if conf_obj.plot: # extract (dummy) date column to plt date_vals = df_data_agevacc_county_cs.loc[ (df_data_agevacc_county_cs[dd.EngEng['ageRKI']] == @@ -902,7 +977,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], df_data_agevacc_county_cs.loc [df_data_agevacc_county_cs[dd.EngEng['ageRKI']] == age, [dd.EngEng['date'], - dd.EngEng['vaccPartial']]].groupby(dd.EngEng['date']).sum() + dd.EngEng['vaccPartial']]].groupby(dd.EngEng['date']).sum() for age in unique_age_groups_old] customPlot.plot_multiple_series( date_vals, yvals, [age for age in unique_age_groups_old], @@ -915,7 +990,7 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], df_data_agevacc_county_cs.loc [df_data_agevacc_county_cs[dd.EngEng['ageRKI']] == age, [dd.EngEng['date'], - dd.EngEng['vaccComplete']]].groupby(dd.EngEng['date']).sum() + dd.EngEng['vaccComplete']]].groupby(dd.EngEng['date']).sum() for age in unique_age_groups_old] customPlot.plot_multiple_series( date_vals, yvals, [age for age in unique_age_groups_old], @@ -931,21 +1006,10 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], dd.EngEng['idCounty']]).agg( {col_new: "sum" for col_new in vacc_column_names}).reset_index() - # store data for all counties - filename = 'vacc_county' - filename = gd.append_filename(filename, impute_dates, moving_average) - gd.write_dataframe(df_data_county_cs, directory, filename, file_format) - - # store data for all federal states: group information on date, state and age level - # (i.e., aggregate information of all counties per federal state) - filename = 'vacc_states' - filename = gd.append_filename(filename, impute_dates, moving_average) df_data_state_cs = df_data_county_cs.groupby( [dd.EngEng['date'], dd.EngEng['idState']]).agg( {column: "sum" for column in vacc_column_names}).reset_index() - gd.write_dataframe(df_data_state_cs, - directory, filename, file_format) ####### age resolved with extrapolation to other age groups ####### # write data frame resolved per county and age (with age classes as @@ -961,26 +1025,14 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], df_data_ageinf_county_cs.reset_index(drop=True, inplace=True) - # store data for all counties - filename = 'vacc_county_ageinf' - filename = gd.append_filename(filename, impute_dates, moving_average) - gd.write_dataframe(df_data_ageinf_county_cs, - directory, filename, file_format) - - # store data for all federal states: group information on date, state and age level - # (i.e., aggregate information of all counties per federal state) - filename = 'vacc_states_ageinf' - filename = gd.append_filename(filename, impute_dates, moving_average) df_data_ageinf_state_cs = df_data_ageinf_county_cs.groupby( [dd.EngEng['date'], dd.EngEng['idState'], dd.EngEng['ageRKI']]).agg( {column: "sum" for column in vacc_column_names}).reset_index() - gd.write_dataframe(df_data_ageinf_state_cs, - directory, filename, file_format) # make plot of relative numbers of original and extrapolated age resolution - if conf.plot: + if conf_obj.plot: # merge Eisenach... population_new_ages = geoger.merge_df_counties_all( population_new_ages, sorting=[dd.EngEng["idCounty"]], @@ -1066,6 +1118,150 @@ def get_vaccination_data(read_data=dd.defaultDict['read_data'], ylabel='Number', fig_name="Germany_FullVacination_AgeExtr_Absolute") + if not conf_obj.to_dataset: + # store data for all counties + filename = 'vacc_county_agevacc' + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_data_agevacc_county_cs, + directory, filename, file_format) + + # store data for all federal states: group information on date, state and age level + # (i.e., aggregate information of all counties per federal state) + filename = 'vacc_states_agevacc' + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_data_agevacc_state_cs, + directory, filename, file_format) + + # store data for all counties + filename = 'vacc_county' + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_data_county_cs, directory, filename, file_format) + + # store data for all federal states: group information on date, state and age level + # (i.e., aggregate information of all counties per federal state) + filename = 'vacc_states' + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_data_state_cs, directory, filename, file_format) + + ####### age resolved with extrapolation to other age groups ####### + # write data frame resolved per county and age (with age classes as + # provided in RKI infection tables: 0-4, 5-14, 15-34, 35-59, 60-79, 80+) + + # store data for all counties + filename = 'vacc_county_ageinf' + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_data_ageinf_county_cs, + directory, filename, file_format) + + # store data for all federal states: group information on date, state and age level + # (i.e., aggregate information of all counties per federal state) + filename = 'vacc_states_ageinf' + filename = gd.append_filename(filename, impute_dates, moving_average) + gd.write_dataframe(df_data_ageinf_state_cs, + directory, filename, file_format) + return None + else: + return (df_data_agevacc_county_cs, df_data_agevacc_state_cs, + df_data_county_cs, df_data_state_cs, + df_data_ageinf_county_cs, df_data_ageinf_state_cs) + + +def get_vaccination_data( + read_data: str = dd.defaultDict['read_data'], + file_format: str = dd.defaultDict['file_format'], + out_folder: str = dd.defaultDict['out_folder'], + start_date: date = dd.defaultDict['start_date'], + end_date: date = dd.defaultDict['end_date'], + moving_average: int = dd.defaultDict['moving_average'], + sanitize_data: int = dd.defaultDict['sanitize_data'], + impute_dates: bool = True, + **kwargs +): + """! Downloads the RKI vaccination data and provides different kind of structured data. + + The data is read from the internet. + The file is read in or stored at the folder "out_folder"/Germany/. + To store and change the data we use pandas. + + While working with the data + - the column names are changed to English depending on defaultDict + - The column "Date" provides information on the date of each data point given in the corresponding columns. + + - The data is exported in three different ways: + - all_county_vacc: Resolved per county by grouping all original age groups (05-11, 12-17, 18-59, 60+) + - all_county_agevacc_vacc: Resolved per county and original age group (05-11, 12-17, 18-59, 60+) + - all_county_ageinf_vacc: Resolved per county and infection data age group (0-4, 5-14, 15-34, 35-59, 60-79, 80+) + - To do so getPopulationData is used and age group specific date from the original source + is extrapolated on the new age groups on county level. + + - Missing dates are imputed for all data frames ('fillDates' is not optional but always executed). + - A central moving average of N days is optional. + + - Start and end dates can be provided to define the length of the returned data frames. + + @param read_data [Currently not used] True or False. Defines if data is read from file or downloaded. + Here Data is always downloaded from the internet. + @param file_format File format which is used for writing the data. Default defined in defaultDict. + @param out_folder Folder where data is written to. Default defined in defaultDict. + @param start_date Date of first date in dataframe. Default defined in defaultDict. + @param end_date Date of last date in dataframe. Default defined in defaultDict. + @param moving_average Integers >=0. Applies an 'moving_average'-days moving average on all time series + to smooth out effects of irregular reporting. Default defined in defaultDict. + @param sanitize_data: Value in {0,1,2,3}; Default: 1. For many counties, + vaccination data is not correctly attributed to home locations of + vaccinated persons. If 'sanitize_data' is set to larger 0, this is + corrected. + 0: No sanitizing applied. + 1: Averaging ratios over federal states. + 2: Averaging ratios over intermediate regions. + 3: All counties with vaccination quotas of more than + 'sanitizing_threshold' will be adjusted to the average of its + federal state and remaining vaccinations will be distributed to + closely connected neighboring regions using commuter mobility networks. + The sanitizing threshold will be defined by the age group-specific + average on the corresponding vaccination ratios on county and federal + state level. + @param impute_dates bool True or False. Defines if values for dates without new information are imputed. Default defined in defaultDict. + @param to_dataset bool True or False. Whether to return the dataframe as an object instead of json file. + If True - returns objects with dataframes + If False - write dataframes into files + Default defined in defaultDict. + + @return None + """ + conf = gd.Conf(out_folder, **kwargs) + out_folder = conf.path_to_use + + directory = os.path.join(out_folder, 'Germany/') + gd.check_dir(directory) + + filename = "RKIVaccFull" + raw_df = fetch_vaccination_data( + conf_obj=conf, + filename=filename, + directory=directory, + read_data=read_data, + ) + process_dict_df = process_vaccination_data( + df_data=raw_df, + conf_obj=conf, + directory=directory, + start_date=start_date, + end_date=end_date, + file_format=file_format, + moving_average=moving_average, + sanitize_data=sanitize_data + ) + silver_datasets = write_vaccination_data(dict_data=process_dict_df, + conf_obj=conf, + directory=directory, + file_format=file_format, + impute_dates=impute_dates, + moving_average=moving_average, + ) + if conf.to_dataset: + return silver_datasets + def main(): """! Main program entry.""" @@ -1075,5 +1271,4 @@ def main(): if __name__ == "__main__": - main() diff --git a/pycode/memilio-epidata/memilio/epidata_test/test_epidata_get_divi_data.py b/pycode/memilio-epidata/memilio/epidata_test/test_epidata_get_divi_data.py index 1574fb4107..fb825765ec 100644 --- a/pycode/memilio-epidata/memilio/epidata_test/test_epidata_get_divi_data.py +++ b/pycode/memilio-epidata/memilio/epidata_test/test_epidata_get_divi_data.py @@ -77,7 +77,7 @@ def test_get_divi_data_prints(self, mock_print, mock_file, mock_san): 'Warning: First data available on 2020-04-24. You asked for 2020-01-01. Changed it to 2020-04-24.')] gdd_calls = self.gdd_calls(text='') expected_calls = expected_call + gdd_calls - mock_print.assert_has_calls(expected_calls) + mock_print.assert_has_calls(expected_calls, any_order=True) mock_san.assert_has_calls([call(self.df_test)]) @patch('memilio.epidata.getDIVIData.divi_data_sanity_checks') @@ -86,7 +86,11 @@ def test_get_divi_data_prints(self, mock_print, mock_file, mock_san): def test_get_divi_data(self, mock_print, mock_file, mock_san): mock_file.return_value = self.df_test # test case with standard parameters - (df, df_county, df_states, df_ger) = gdd.get_divi_data(out_folder=self.path) + datasets = gdd.get_divi_data(out_folder=self.path) + df = datasets['raw_data'] + df_county = datasets['counties'] + df_states = datasets['states'] + df_ger = datasets['Germany'] mock_san.assert_has_calls([call(self.df_test)]) pd.testing.assert_frame_equal(df, self.df_test) self.assertEqual( @@ -122,8 +126,11 @@ def test_get_divi_data(self, mock_print, mock_file, mock_san): def test_gdd_ma(self, mock_print, mock_file, mock_san): mock_file.return_value = self.df_test # test case with moving average - (df, df_county, df_states, df_ger) = gdd.get_divi_data( - out_folder=self.path, moving_average=3) + datasets = gdd.get_divi_data(out_folder=self.path, moving_average=3) + df = datasets['raw_data'] + df_county = datasets['counties'] + df_states = datasets['states'] + df_ger = datasets['Germany'] mock_san.assert_has_calls([call(self.df_test)]) pd.testing.assert_frame_equal(df, self.df_test) self.assertAlmostEqual( @@ -159,8 +166,11 @@ def test_gdd_ma(self, mock_print, mock_file, mock_san): def test_gdd_all_dates(self, mock_print, mock_file, mock_san): mock_file.return_value = self.df_test.copy() # test case with impute dates is True - (df, df_county, df_states, df_ger) = gdd.get_divi_data( - out_folder=self.path, impute_dates=True) + datasets = gdd.get_divi_data(out_folder=self.path, impute_dates=True) + df = datasets['raw_data'] + df_county = datasets['counties'] + df_states = datasets['states'] + df_ger = datasets['Germany'] # Test if sanity check was called self.assertTrue(mock_san.called) pd.testing.assert_frame_equal(df, self.df_test)