Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename, fix, and extend NAWQA (NWQN) demo #153

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions demos/nawqa_data_pull/lithops.yaml

This file was deleted.

112 changes: 0 additions & 112 deletions demos/nawqa_data_pull/retrieve_nawqa_with_lithops.py

This file was deleted.

31 changes: 19 additions & 12 deletions demos/nawqa_data_pull/README.md → demos/nwqn_data_pull/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Retrieva data from the National Water Quality Assessment Program (NAWQA)
# Retrieve data from the National Water Quality Network (NWQN)

This examples walks through using lithops to retrieve data from every NAWQA
monitoring site, then writes the results to a parquet files on s3. Each
retrieval also searches the NLDI for neighboring sites with NAWQA data and
merges those data assuming the monitoring site was relocated.
> This usage example is for demonstration and not for research or
> operational use.

This example uses Lithops to retrieve data from every NWQN
monitoring site, then writes the results to Parquet files on S3. Each
retrieval also searches the NLDI for neighboring sites with NWQN data and
merges those data. In the streamflow retrieval, the neighborhood search
progressively fill in gaps in the record by taking data from the
nearest streamgage and rescaling it by the drainage area ratio.

1. Set up a Python environment
```bash
Expand All @@ -12,33 +17,35 @@ conda activate dataretrieval-lithops
pip install -r requirements.txt
```

1. Configure compute and storage backends for [lithops](https://lithops-cloud.github.io/docs/source/configuration.html).
2. Configure compute and storage backends for [lithops](https://lithops-cloud.github.io/docs/source/configuration.html).
The configuration in `lithops.yaml` uses AWS Lambda for [compute](https://lithops-cloud.github.io/docs/source/compute_config/aws_lambda.html) and AWS S3 for [storage](https://lithops-cloud.github.io/docs/source/storage_config/aws_s3.html).
To use those backends, simply edit `lithops.yaml` with your `bucket` and `execution_role`.

1. Build a runtime image for Cubed
3. Build a runtime image for Cubed
```bash
export LITHOPS_CONFIG_FILE=$(pwd)/lithops.yaml
lithops runtime build -b aws_lambda -f Dockerfile_dataretrieval dataretrieval-runtime
```

1. Download site list
4. Download the site list from ScienceBase using `wget` or navigate to the URL and copy the CVS into `nwqn_data_pull/`.
```bash
wget https://www.sciencebase.gov/catalog/file/get/655d2063d34ee4b6e05cc9e6?f=__disk__b3%2F3e%2F5b%2Fb33e5b0038f004c2a48818d0fcc88a0921f3f689 -O NWQN_sites.csv
```

1. Create a s3 bucket for the output, then set it as an environmental variable
5. Create a s3 bucket for the output, then set it as an environmental variable
```bash
export DESTINATION_BUCKET=<path/to/bucket>
```

1. Run the script
6. Run the scripts
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't seem to comment on unchanged lines, but this refers to line 27: I didn't know I needed to download wget (either in bash or pip install via python) before downloading the sciencebase data using that method. Add a note about it, perhaps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. That will be system-dependent, but I noted that alternatively you can navigate to the url to download the file.

```bash
python retrieve_nawqa_with_lithops.py
python retrieve_nwqn_samples.py

python retrieve_nwqn_streamflow.py
```

## Cleaning up
To rebuild the Litops image, delete the existing one by running
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small typo: lithops

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☝️

To rebuild the Lithops image, delete the existing one by running
```bash
lithops runtime delete -b aws_lambda -d dataretrieval-runtime
```
15 changes: 15 additions & 0 deletions demos/nwqn_data_pull/lithops.yaml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not test this functionality. Might make sense to ask someone with an AWS role to try out the lithops piece.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
lithops:
backend: aws_lambda
storage: aws_s3

aws:
region: us-west-2

aws_lambda:
execution_role: arn:aws:iam::account-id:role/lambdaLithopsExecutionRole
runtime: dataretrieval-runtime
runtime_memory: 1024
runtime_timeout: 900

aws_s3:
bucket: arn:aws:s3:::the-name-of-your-bucket
174 changes: 174 additions & 0 deletions demos/nwqn_data_pull/retrieve_nwqn_samples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Retrieve data from the National Water Quality Assessment Program (NAWQA)

import lithops
import math
import os
import pandas as pd

from random import randint
from time import sleep
from dataretrieval import nldi, nwis, wqp

DESTINATION_BUCKET = os.environ.get('DESTINATION_BUCKET')
PROJECT = "National Water Quality Assessment Program (NAWQA)"
# some sites are not found in NLDI, avoid them for now
NOT_FOUND_SITES = [
"15565447", # "USGS-"
"15292700",
]
BAD_GEOMETRY_SITES = [
"06805500",
"09306200",
]

BAD_NLDI_SITES = NOT_FOUND_SITES + BAD_GEOMETRY_SITES


def map_retrieval(site):
"""Map function to pull data from NWIS and WQP"""
print(f"Retrieving samples from site {site}")
# skip bad sites
if site in BAD_NLDI_SITES:
site_list = [site]
# else query slowly
else:
sleep(randint(0, 5))
site_list = find_neighboring_sites(site)

# reformat for wqp
site_list = [f"USGS-{site}" for site in site_list]

df, _ = wqp_get_results(siteid=site_list,
project=PROJECT,
)

try:
# merge sites
df['MonitoringLocationIdentifier'] = f"USGS-{site}"
df.astype(str).to_parquet(f's3://{DESTINATION_BUCKET}/nwqn-samples.parquet',
engine='pyarrow',
partition_cols=['MonitoringLocationIdentifier'],
compression='zstd')
# optionally, `return df` for further processing

except Exception as e:
print(f"No samples returned from site {site}: {e}")


def exponential_backoff(max_retries=5, base_delay=1):
"""Exponential backoff decorator with configurable retries and base delay"""
def decorator(func):
def wrapper(*args, **kwargs):
attempts = 0
while True:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
if attempts > max_retries:
raise e
wait_time = base_delay * (2 ** attempts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I follow to this point: are you making it so that with every failed attempt, the wait time increases exponentially between attempts (until max_retries is satisified)? Might be helpful to add a comment here.

print(f"Retrying in {wait_time} seconds...")
sleep(wait_time)
return wrapper
return decorator


@exponential_backoff(max_retries=5, base_delay=1)
def nwis_get_info(*args, **kwargs):
return nwis.get_info(*args, **kwargs)


@exponential_backoff(max_retries=5, base_delay=1)
def wqp_get_results(*args, **kwargs):
return wqp.get_results(*args, **kwargs)


@exponential_backoff(max_retries=3, base_delay=1)
def find_neighboring_sites(site, search_factor=0.1, fudge_factor=3.0):
"""Find sites upstream and downstream of the given site within a certain distance.

TODO Use geoconnex to determine mainstem length

Parameters
----------
site : str
8-digit site number.
search_factor : float, optional
The factor by which to multiply the watershed length to determine the
search distance.
fudge_factor : float, optional
An additional fudge factor to apply to the search distance, because
watersheds are not circular.
"""
site_df, _ = nwis_get_info(sites=site)
drain_area_sq_mi = site_df["drain_area_va"].values[0]
length = _estimate_watershed_length_km(drain_area_sq_mi)
search_distance = length * search_factor * fudge_factor
# clip between 1 and 9999km
search_distance = max(1.0, min(9999.0, search_distance))

# get upstream and downstream sites
gdfs = [
nldi.get_features(
feature_source="WQP",
feature_id=f"USGS-{site}",
navigation_mode=mode,
distance=search_distance,
data_source="nwissite",
)
for mode in ["UM", "DM"] # upstream and downstream
]

features = pd.concat(gdfs, ignore_index=True)

df, _ = nwis_get_info(sites=list(features.identifier.str.strip('USGS-')))
# drop sites with disimilar different drainage areas
df = df.where(
(df["drain_area_va"] / drain_area_sq_mi) > search_factor,
).dropna(how="all")

site_list = df["site_no"].to_list()

# include the original search site among the neighbors
if site not in site_list:
site_list.append(site)

return site_list


def _estimate_watershed_length_km(drain_area_sq_mi):
"""Estimate the diameter assuming a circular watershed.

Parameters
----------
drain_area_sq_mi : float
The drainage area in square miles.

Returns
-------
float
The diameter of the watershed in kilometers.
"""
# assume a circular watershed
length_miles = 2 * (drain_area_sq_mi / math.pi) ** 0.5
# convert from miles to km
return length_miles * 1.60934


if __name__ == "__main__":
project = "National Water Quality Assessment Program (NAWQA)"

site_df = pd.read_csv(
'NWQN_sites.csv',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could set this directly to the sciencebase URL.

comment='#',
dtype={'SITE_QW_ID': str, 'SITE_FLOW_ID': str},
)

site_list = site_df['SITE_QW_ID'].to_list()
#site_list = site_list[:2] # prune for testing

fexec = lithops.FunctionExecutor(config_file="lithops.yaml")
futures = fexec.map(map_retrieval, site_list)

futures.get_result()
Loading
Loading