-
Notifications
You must be signed in to change notification settings - Fork 0
/
zarr_csv.py
170 lines (135 loc) · 7.24 KB
/
zarr_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python
# imports
import glob
import logging
import fsspec
import ujson
import time
import dask
import dask.delayed
import xarray as xr
import pandas as pd
import dataretrieval.nwis as nwis # retrive the observed streamflow of USGS gages
import s3fs
from datetime import datetime
import os
import random
import dask.dataframe as dd
from netCDF4 import Dataset
from dask.delayed import delayed
import zarr
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
import pathlib
# specify the input year
#year='2018'
year = os.getenv("param_output_year")
# Specify date range of interest to create the csv files for the variables of interest
dates=("{0}-01-01".format(year),"{0}-12-31".format(year))
pathlib.Path('/job/result/csv_files/').mkdir(exist_ok=True)
# path of NWM retrospective (NWM-R2.0)
s3_path = 's3://noaa-nwm-retro-v2-zarr-pds'
# Connect to S3
s3 = s3fs.S3FileSystem(anon=True)
store = s3fs.S3Map(root=s3_path, s3=s3, check=False)
#Adding data from version 3 bundel
s3_v3_path = 's3://noaa-nwm-retrospective-3-0-pds/CONUS/zarr/chrtout.zarr'
ds_chrtout = xr.open_zarr(fsspec.get_mapper(s3_v3_path,anon=True),consolidated=True)
s3_path_v2_1 = "s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr"
ds_chrtout2 = xr.open_zarr(fsspec.get_mapper(s3_path_v2_1,anon=True),consolidated=True)
# load the dataset
start_time = time.time()
print("before reading amazon bucket data", start_time)
ds = xr.open_zarr(store=store, consolidated=True)
end_time = time.time()
elapsed_time = end_time - start_time
# Print the loop execution time
print("time taken to load zarr data from amazon bucket: {:.2f} seconds".format(elapsed_time))
print("done loading Zarr dataset")
# path of the WRF-Hydro model results/outputs
output_wrfhydro='/anvil/scratch/x-cybergis/compute/1716398519PVftu/Outputs'
# Creating a xarray data set from reading the existing zarr files for the specific input year
reach_ds = xr.open_dataset("/job/result/zarr_files/{0}.zarr".format(year)).chunk(chunks={"time":67, "feature_id":15000})
# path of the route link "Rouet_Link.nc"
jobid_cuahsi_subset_domain='17163969355sWU0'
routelink ='/compute_shared/{}/Route_Link.nc'.format(jobid_cuahsi_subset_domain)
# convert rouetlink to dataframe
route_df = xr.open_dataset(routelink).to_dataframe() # convert routelink to dataframe
route_df.gages = route_df.gages.str.decode('utf-8').str.strip()
########################################################################
########################################################################
## Querying USGS gauges and river reaches that exist within our spatial domain (watershed of interest).
print("route_df",route_df.gages)
usgs_gages = route_df.loc[route_df['gages'] != '']
print("usgs_gages",usgs_gages)
usgs_ids=usgs_gages.gages ## USGS gages_ids exit within the watershed of interest
# Initialize an empty dataframe for the purpose of concatenating the dataframes produced during each iteration of the for-loop.
output_df=pd.DataFrame()
start_time1 = time.time()
if (usgs_ids.empty):
print("There are no USGS gages exist within this watershed and no csv files is generated")
# Iterate through the existing USGS gages located within the spatial boundary of the watershed of interest.
for gid in usgs_ids:
try:
ll=str(list(usgs_gages.loc[usgs_gages.gages == gid].link)) ## corresponding streamlink to USGS ID
ll=ll.replace("[","")
ll=ll.replace("]","")
timerange = slice(dates[0], dates[1])
flow_data=nwis.get_record(sites=str(gid),service='iv', start=dates[0], end=dates[1], access='3')
if flow_data.empty or "00060" not in flow_data.columns:
continue
# Retrieving the Simulated streamflow data
sim_streamflow=reach_ds.sel(feature_id=int(ll), time=timerange).streamflow.persist()
sim_streamflow_df=sim_streamflow.to_dataframe()["streamflow"]
sim_streamflow_df=sim_streamflow_df.to_frame()
# Retrieve the streamflow data from NWM-R2.0 (zarr format)
R2_streamflow = ds.sel(feature_id=int(ll),
time=timerange).streamflow.persist()
R2_streamflow_df=R2_streamflow.to_dataframe()["streamflow"]
R2_streamflow_df =R2_streamflow_df.to_frame()
# Retrieving streamflow simulated data from version 3
R2_streamflow_V3 = ds_chrtout.sel(feature_id=int(ll),time=timerange).streamflow.persist()
R2_streamflow_df_V3=R2_streamflow_V3.to_dataframe()["streamflow"]
R2_streamflow_df_V3 =R2_streamflow_df_V3.to_frame()
# Retrieving streamflow simulated data from version 2.1
R2_streamflow_V2_1 = ds_chrtout2.sel(feature_id=int(ll),time=timerange).streamflow.persist()
R2_streamflow_df_V2_1=R2_streamflow_V2_1.to_dataframe()["streamflow"]
R2_streamflow_df_V2_1 =R2_streamflow_df_V2_1.to_frame()
# Retrieve the USGS obs. streamflow
# get instantaneous values (iv) (measurement each 15 minutes)
qobs=nwis.get_record(sites=str(gid), service='iv', start=dates[0], end=dates[1], access='3')
# print(qobs)
qobs=qobs["00060"]/35.3147 ## divided by 35.3147 to convert to m3/sec
# convert instantaneous streamflow (15 minutes) to hourly value
# select only the date and hour parts of the index
qobs.index = qobs.index.floor('H')
# calculate the average value for each date and hour
qobs_hourly = qobs.groupby(qobs.index).mean()
# format the datetime index to a new format with only year, month, day, and hour
qobs_hourly.index=qobs_hourly.index.strftime('%Y-%m-%d %H:00:00')
qobs_hourly.index=pd.to_datetime(qobs_hourly.index)
# concatenate the two dataframes using outer join to fill missing values with NaN
df_concat1 = pd.concat([R2_streamflow_df, qobs_hourly],
axis=1,
join="outer")
# rename the columns of streamflow records obtained from NWM-R2.0 and USGS obs.
df_concat1.rename(columns = {'streamflow':'NWM-R2.0_Streamflow_{}'.format(gid)}, inplace=True)
df_concat1.rename(columns = {'00060':'Obs_Streamflow_{}'.format(gid)}, inplace=True)
# rename the columns of streamflow records obtained from NWM-R2.0 and simulated streamflow data.
R2_streamflow_df_V2_1.rename(columns = {'streamflow':'NWM-R2.1_Streamflow_{}'.format(gid)}, inplace=True)
R2_streamflow_df_V3.rename(columns = {'streamflow':'NWM-R3.0_Streamflow_{}'.format(gid)}, inplace=True)
sim_streamflow_df.rename(columns = {"streamflow":'Sim_Streamflow_{}'.format(gid)}, inplace=True)
#concatenating the data frames
df_concat2 = pd.concat([sim_streamflow_df,df_concat1,R2_streamflow_df_V2_1,R2_streamflow_df_V3],axis=1,join="outer")
output_df=pd.concat([df_concat2, output_df], axis=1, join="outer")
except Exception as ex:
print("an exception has occured",ex)
continue
output_df.index.name = "time"
# save the merged dataframe to a CSV file
output_df.to_csv('/job/result/csv_files/{0}.csv'.format(year), index=True)
end_time1 = time.time()
# Calculate the elapsed time
elapsed_time1 = end_time1 - start_time1
# Print the loop execution time
print("Time take to create csv files for chrtout data: {:.2f} seconds".format(elapsed_time1))