From aa48ad636e6b4f3156134edc3fef7bc4cad1caa0 Mon Sep 17 00:00:00 2001 From: Timothy Hodson <34148978+thodson-usgs@users.noreply.github.com> Date: Fri, 30 Aug 2024 08:41:07 -0500 Subject: [PATCH] Rename, fix, and extend NAWQA (NWQN) demo (#153) * Rename, fix, and extend NAQWA (NWQN) demo --- demos/nawqa_data_pull/lithops.yaml | 14 -- .../retrieve_nawqa_with_lithops.py | 112 ----------- .../Dockerfile_dataretrieval | 0 .../README.md | 31 ++-- demos/nwqn_data_pull/lithops.yaml | 15 ++ .../requirements.txt | 0 demos/nwqn_data_pull/retrieve_nwqn_samples.py | 174 ++++++++++++++++++ .../retrieve_nwqn_streamflow.py | 137 ++++++++++++++ 8 files changed, 345 insertions(+), 138 deletions(-) delete mode 100644 demos/nawqa_data_pull/lithops.yaml delete mode 100644 demos/nawqa_data_pull/retrieve_nawqa_with_lithops.py rename demos/{nawqa_data_pull => nwqn_data_pull}/Dockerfile_dataretrieval (100%) rename demos/{nawqa_data_pull => nwqn_data_pull}/README.md (51%) create mode 100644 demos/nwqn_data_pull/lithops.yaml rename demos/{nawqa_data_pull => nwqn_data_pull}/requirements.txt (100%) create mode 100644 demos/nwqn_data_pull/retrieve_nwqn_samples.py create mode 100644 demos/nwqn_data_pull/retrieve_nwqn_streamflow.py diff --git a/demos/nawqa_data_pull/lithops.yaml b/demos/nawqa_data_pull/lithops.yaml deleted file mode 100644 index 2681ffc..0000000 --- a/demos/nawqa_data_pull/lithops.yaml +++ /dev/null @@ -1,14 +0,0 @@ -lithops: - backend: aws_lambda - storage: aws_s3 - -aws: - region: us-west-2 - -aws_lambda: - execution_role: arn:aws:iam::807615458658:role/lambdaLithopsExecutionRole - runtime: dataretrieval-runtime - runtime_memory: 2000 - -aws_s3: - bucket: arn:aws:s3:::cubed-thodson-temp diff --git a/demos/nawqa_data_pull/retrieve_nawqa_with_lithops.py b/demos/nawqa_data_pull/retrieve_nawqa_with_lithops.py deleted file mode 100644 index 9f0926e..0000000 --- a/demos/nawqa_data_pull/retrieve_nawqa_with_lithops.py +++ /dev/null @@ -1,112 +0,0 @@ -# Retrieve data from the National Water Quality Assessment Program (NAWQA) - -import lithops -import math -import os -import pandas as pd - -from dataretrieval import nldi, nwis, wqp - -DESTINATION_BUCKET = os.environ.get('DESTINATION_BUCKET') -PROJECT = "National Water Quality Assessment Program (NAWQA)" - - -def map_retrieval(site): - """Map function to pull data from NWIS and WQP""" - site_list = find_neighboring_sites(site) - # reformat for wqp - site_list = [f"USGS-{site}" for site in site_list] - - df, _ = wqp.get_results(siteid=site_list, - project=PROJECT, - ) - - # merge sites - df['MonitoringLocationIdentifier'] = f"USGS-{site}" - - if len(df) != 0: - df.astype(str).to_parquet(f's3://{DESTINATION_BUCKET}/nwqn-samples.parquet', - engine='pyarrow', - partition_cols=['MonitoringLocationIdentifier'], - compression='zstd') - # optionally, `return df` for further processing - - -def find_neighboring_sites(site, search_factor=0.05): - """Find sites upstream and downstream of the given site within a certain distance. - - Parameters - ---------- - site : str - 8-digit site number. - search_factor : float, optional - """ - site_df, _ = nwis.get_info(sites=site) - drain_area_sq_mi = site_df["drain_area_va"].values[0] - length = _estimate_watershed_length_km(drain_area_sq_mi) - search_distance = length * search_factor - # clip between 1 and 9999km - search_distance = max(1.0, min(9999.0, search_distance)) - - upstream_gdf = nldi.get_features( - feature_source="WQP", - feature_id=f"USGS-{site}", - navigation_mode="UM", - distance=search_distance, - data_source="nwissite", - ) - - downstream_gdf = nldi.get_features( - feature_source="WQP", - feature_id=f"USGS-{site}", - navigation_mode="DM", - distance=search_distance, - data_source="nwissite", - ) - - features = pd.concat([upstream_gdf, downstream_gdf], ignore_index=True) - - df, _ = nwis.get_info(sites=list(features.identifier.str.strip('USGS-'))) - # drop sites with disimilar different drainage areas - df = df.where( - (df["drain_area_va"] / drain_area_sq_mi) > search_factor, - ).dropna(how="all") - - return df["site_no"].to_list() - - -def _estimate_watershed_length_km(drain_area_sq_mi): - """Estimate the diameter assuming a circular watershed. - - Parameters - ---------- - drain_area_sq_mi : float - The drainage area in square miles. - - Returns - ------- - float - The diameter of the watershed in kilometers. - """ - # assume a circular watershed - length_miles = 2 * (drain_area_sq_mi / math.pi) ** 0.5 - # convert to km - return length_miles * 1.60934 - - -if __name__ == "__main__": - project = "National Water Quality Assessment Program (NAWQA)" - - site_df = pd.read_csv( - 'NWQN_sites.csv', - comment='#', - dtype={'SITE_QW_ID': str, 'SITE_FLOW_ID': str}, - ) - - site_list = site_df['SITE_QW_ID'].to_list() - # site_list = site_list[:4] # prune for testing - - fexec = lithops.FunctionExecutor(config_file="lithops.yaml") - futures = fexec.map(map_retrieval, site_list) - - futures.get_result() diff --git a/demos/nawqa_data_pull/Dockerfile_dataretrieval b/demos/nwqn_data_pull/Dockerfile_dataretrieval similarity index 100% rename from demos/nawqa_data_pull/Dockerfile_dataretrieval rename to demos/nwqn_data_pull/Dockerfile_dataretrieval diff --git a/demos/nawqa_data_pull/README.md b/demos/nwqn_data_pull/README.md similarity index 51% rename from demos/nawqa_data_pull/README.md rename to demos/nwqn_data_pull/README.md index 141dda1..c3f6171 100644 --- a/demos/nawqa_data_pull/README.md +++ b/demos/nwqn_data_pull/README.md @@ -1,9 +1,14 @@ -# Retrieva data from the National Water Quality Assessment Program (NAWQA) +# Retrieve data from the National Water Quality Network (NWQN) -This examples walks through using lithops to retrieve data from every NAWQA -monitoring site, then writes the results to a parquet files on s3. Each -retrieval also searches the NLDI for neighboring sites with NAWQA data and -merges those data assuming the monitoring site was relocated. +> This usage example is for demonstration and not for research or +> operational use. + +This example uses Lithops to retrieve data from every NWQN +monitoring site, then writes the results to Parquet files on S3. Each +retrieval also searches the NLDI for neighboring sites with NWQN data and +merges those data. In the streamflow retrieval, the neighborhood search +progressively fill in gaps in the record by taking data from the +nearest streamgage and rescaling it by the drainage area ratio. 1. Set up a Python environment ```bash @@ -12,33 +17,35 @@ conda activate dataretrieval-lithops pip install -r requirements.txt ``` -1. Configure compute and storage backends for [lithops](https://lithops-cloud.github.io/docs/source/configuration.html). +2. Configure compute and storage backends for [lithops](https://lithops-cloud.github.io/docs/source/configuration.html). The configuration in `lithops.yaml` uses AWS Lambda for [compute](https://lithops-cloud.github.io/docs/source/compute_config/aws_lambda.html) and AWS S3 for [storage](https://lithops-cloud.github.io/docs/source/storage_config/aws_s3.html). To use those backends, simply edit `lithops.yaml` with your `bucket` and `execution_role`. -1. Build a runtime image for Cubed +3. Build a runtime image for Cubed ```bash export LITHOPS_CONFIG_FILE=$(pwd)/lithops.yaml lithops runtime build -b aws_lambda -f Dockerfile_dataretrieval dataretrieval-runtime ``` -1. Download site list +4. Download the site list from ScienceBase using `wget` or navigate to the URL and copy the CVS into `nwqn_data_pull/`. ```bash wget https://www.sciencebase.gov/catalog/file/get/655d2063d34ee4b6e05cc9e6?f=__disk__b3%2F3e%2F5b%2Fb33e5b0038f004c2a48818d0fcc88a0921f3f689 -O NWQN_sites.csv ``` -1. Create a s3 bucket for the output, then set it as an environmental variable +5. Create a s3 bucket for the output, then set it as an environmental variable ```bash export DESTINATION_BUCKET= ``` -1. Run the script +6. Run the scripts ```bash -python retrieve_nawqa_with_lithops.py +python retrieve_nwqn_samples.py + +python retrieve_nwqn_streamflow.py ``` ## Cleaning up -To rebuild the Litops image, delete the existing one by running +To rebuild the Lithops image, delete the existing one by running ```bash lithops runtime delete -b aws_lambda -d dataretrieval-runtime ``` diff --git a/demos/nwqn_data_pull/lithops.yaml b/demos/nwqn_data_pull/lithops.yaml new file mode 100644 index 0000000..9474a24 --- /dev/null +++ b/demos/nwqn_data_pull/lithops.yaml @@ -0,0 +1,15 @@ +lithops: + backend: aws_lambda + storage: aws_s3 + +aws: + region: us-west-2 + +aws_lambda: + execution_role: arn:aws:iam::account-id:role/lambdaLithopsExecutionRole + runtime: dataretrieval-runtime + runtime_memory: 1024 + runtime_timeout: 900 + +aws_s3: + bucket: arn:aws:s3:::the-name-of-your-bucket diff --git a/demos/nawqa_data_pull/requirements.txt b/demos/nwqn_data_pull/requirements.txt similarity index 100% rename from demos/nawqa_data_pull/requirements.txt rename to demos/nwqn_data_pull/requirements.txt diff --git a/demos/nwqn_data_pull/retrieve_nwqn_samples.py b/demos/nwqn_data_pull/retrieve_nwqn_samples.py new file mode 100644 index 0000000..ceb8ff5 --- /dev/null +++ b/demos/nwqn_data_pull/retrieve_nwqn_samples.py @@ -0,0 +1,174 @@ +# Retrieve data from the National Water Quality Assessment Program (NAWQA) + +import lithops +import math +import os +import pandas as pd + +from random import randint +from time import sleep +from dataretrieval import nldi, nwis, wqp + +DESTINATION_BUCKET = os.environ.get('DESTINATION_BUCKET') +PROJECT = "National Water Quality Assessment Program (NAWQA)" +# some sites are not found in NLDI, avoid them for now +NOT_FOUND_SITES = [ + "15565447", # "USGS-" + "15292700", +] +BAD_GEOMETRY_SITES = [ + "06805500", + "09306200", +] + +BAD_NLDI_SITES = NOT_FOUND_SITES + BAD_GEOMETRY_SITES + + +def map_retrieval(site): + """Map function to pull data from NWIS and WQP""" + print(f"Retrieving samples from site {site}") + # skip bad sites + if site in BAD_NLDI_SITES: + site_list = [site] + # else query slowly + else: + sleep(randint(0, 5)) + site_list = find_neighboring_sites(site) + + # reformat for wqp + site_list = [f"USGS-{site}" for site in site_list] + + df, _ = wqp_get_results(siteid=site_list, + project=PROJECT, + ) + + try: + # merge sites + df['MonitoringLocationIdentifier'] = f"USGS-{site}" + df.astype(str).to_parquet(f's3://{DESTINATION_BUCKET}/nwqn-samples.parquet', + engine='pyarrow', + partition_cols=['MonitoringLocationIdentifier'], + compression='zstd') + # optionally, `return df` for further processing + + except Exception as e: + print(f"No samples returned from site {site}: {e}") + + +def exponential_backoff(max_retries=5, base_delay=1): + """Exponential backoff decorator with configurable retries and base delay""" + def decorator(func): + def wrapper(*args, **kwargs): + attempts = 0 + while True: + try: + return func(*args, **kwargs) + except Exception as e: + attempts += 1 + if attempts > max_retries: + raise e + wait_time = base_delay * (2 ** attempts) + print(f"Retrying in {wait_time} seconds...") + sleep(wait_time) + return wrapper + return decorator + + +@exponential_backoff(max_retries=5, base_delay=1) +def nwis_get_info(*args, **kwargs): + return nwis.get_info(*args, **kwargs) + + +@exponential_backoff(max_retries=5, base_delay=1) +def wqp_get_results(*args, **kwargs): + return wqp.get_results(*args, **kwargs) + + +@exponential_backoff(max_retries=3, base_delay=1) +def find_neighboring_sites(site, search_factor=0.1, fudge_factor=3.0): + """Find sites upstream and downstream of the given site within a certain distance. + + TODO Use geoconnex to determine mainstem length + + Parameters + ---------- + site : str + 8-digit site number. + search_factor : float, optional + The factor by which to multiply the watershed length to determine the + search distance. + fudge_factor : float, optional + An additional fudge factor to apply to the search distance, because + watersheds are not circular. + """ + site_df, _ = nwis_get_info(sites=site) + drain_area_sq_mi = site_df["drain_area_va"].values[0] + length = _estimate_watershed_length_km(drain_area_sq_mi) + search_distance = length * search_factor * fudge_factor + # clip between 1 and 9999km + search_distance = max(1.0, min(9999.0, search_distance)) + + # get upstream and downstream sites + gdfs = [ + nldi.get_features( + feature_source="WQP", + feature_id=f"USGS-{site}", + navigation_mode=mode, + distance=search_distance, + data_source="nwissite", + ) + for mode in ["UM", "DM"] # upstream and downstream + ] + + features = pd.concat(gdfs, ignore_index=True) + + df, _ = nwis_get_info(sites=list(features.identifier.str.strip('USGS-'))) + # drop sites with disimilar different drainage areas + df = df.where( + (df["drain_area_va"] / drain_area_sq_mi) > search_factor, + ).dropna(how="all") + + site_list = df["site_no"].to_list() + + # include the original search site among the neighbors + if site not in site_list: + site_list.append(site) + + return site_list + + +def _estimate_watershed_length_km(drain_area_sq_mi): + """Estimate the diameter assuming a circular watershed. + + Parameters + ---------- + drain_area_sq_mi : float + The drainage area in square miles. + + Returns + ------- + float + The diameter of the watershed in kilometers. + """ + # assume a circular watershed + length_miles = 2 * (drain_area_sq_mi / math.pi) ** 0.5 + # convert from miles to km + return length_miles * 1.60934 + + +if __name__ == "__main__": + project = "National Water Quality Assessment Program (NAWQA)" + + site_df = pd.read_csv( + 'NWQN_sites.csv', + comment='#', + dtype={'SITE_QW_ID': str, 'SITE_FLOW_ID': str}, + ) + + site_list = site_df['SITE_QW_ID'].to_list() + #site_list = site_list[:2] # prune for testing + + fexec = lithops.FunctionExecutor(config_file="lithops.yaml") + futures = fexec.map(map_retrieval, site_list) + + futures.get_result() diff --git a/demos/nwqn_data_pull/retrieve_nwqn_streamflow.py b/demos/nwqn_data_pull/retrieve_nwqn_streamflow.py new file mode 100644 index 0000000..30eed0e --- /dev/null +++ b/demos/nwqn_data_pull/retrieve_nwqn_streamflow.py @@ -0,0 +1,137 @@ +# Retrieve data from the National Water Quality Assessment Program (NAWQA) + +import lithops +import os +import numpy as np +import pandas as pd + + +from dataretrieval import nwis +from random import randint +from time import sleep + +from retrieve_nwqn_samples import find_neighboring_sites, BAD_NLDI_SITES + +DESTINATION_BUCKET = os.environ.get('DESTINATION_BUCKET') +START_DATE = "1991-01-01" +END_DATE = "2023-12-31" + +def map_retrieval(site): + """Map function to pull data from NWIS and WQP""" + print(f"Retrieving daily streamflow from site {site}") + + if site in BAD_NLDI_SITES: + site_list = [site] + # else query slowly + else: + sleep(randint(0, 5)) + site_list = find_neighboring_sites(site) + + df, _ = nwis.get_dv( + sites=site_list, + start=START_DATE, + end=END_DATE, + parameterCd="00060", + ) + + # by default, site_no is not in the index if a single site is queried + if "site_no" in df.columns: + index_name = df.index.names[0] + df.set_index(["site_no", df.index], inplace=True) + df.index.names = ["site_no", index_name] + + print(len(df), "records retrieved") + # process the results + if not df.empty: + # drop rows with missing values; neglect other 00060_* columns + df = df.dropna(subset=["00060_Mean"]) + # fill missing codes to enable string operations + df["00060_Mean_cd"] = df["00060_Mean_cd"].fillna("M") + df = df[df["00060_Mean_cd"].str.contains("A")] + df['00060_Mean'] = df['00060_Mean'].replace(-999999, np.nan) + + site_info, _ = nwis.get_info(sites=site_list) + # USACE sites may have same site_no, which creates index conflicts later + site_info = site_info[site_info["agency_cd"] == "USGS"] # keep only USGS sites + site_info = site_info.set_index("site_no") + + main_site = site_info.loc[site] + main_site_drainage_area = main_site["drain_area_va"] + + # compute fraction of drainage area + site_info = site_info[["drain_area_va"]].copy() + site_info["drain_fraction"] = site_info["drain_area_va"] / main_site_drainage_area + site_info["fraction_diff"] = np.abs(1 - site_info["drain_fraction"]) + + # apply drainage area fraction + df = pd.merge(df, site_info, left_index=True, right_index=True) + df["00060_Mean"] *= site_info.loc[df.index.get_level_values("site_no"), "drain_fraction"].values + + # order sites by the difference in drainage area fraction + fill_order = site_info.sort_values("fraction_diff", ascending=True) + fill_order = fill_order.index.values + + flow_sites = df.index.get_level_values("site_no").values + fill_order = set(fill_order).intersection(flow_sites) + + output = pd.DataFrame() + + # loop through sites and fill in missing flow values + # going from most to least-similar drainage areas. + for fill_site in fill_order: + fill_data = df.loc[fill_site] + output = update_dataframe(output, fill_data) + + output = output.drop(columns=["drain_area_va", "drain_fraction", "fraction_diff"]) + output["site_no"] = site + + else: + print(f"No data retrieved for site {site}") + return + + try: + # merge sites + output.astype(str).to_parquet(f's3://{DESTINATION_BUCKET}/nwqn-streamflow.parquet', + engine='pyarrow', + partition_cols=['site_no'], + compression='zstd') + # optionally, `return df` for further processing + + except Exception as e: + print(f"Failed to write parquet: {e}") + + +def update_dataframe( + original_df: pd.DataFrame, + new_df: pd.DataFrame, + overwrite: bool = False, +) -> pd.DataFrame: + """Update a DataFrame with values from another DataFrame. + + NOTE: this fuction does not handle MultiIndex DataFrames. + """ + # Identify new rows in new_df that are not in original_df + new_rows = new_df[~new_df.index.isin(original_df.index)] + + # Concatenate new rows to original_df + original_df = pd.concat([original_df, new_rows]).sort_index() + + return original_df + + +if __name__ == "__main__": + project = "National Water Quality Assessment Program (NAWQA)" + + site_df = pd.read_csv( + 'NWQN_sites.csv', + comment='#', + dtype={'SITE_QW_ID': str, 'SITE_FLOW_ID': str}, + ) + + site_list = site_df['SITE_QW_ID'].to_list() + # site_list = site_list[:4] # prune for testing + + fexec = lithops.FunctionExecutor(config_file="lithops.yaml") + futures = fexec.map(map_retrieval, site_list) + + futures.get_result()