-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmerra_etl.py
255 lines (224 loc) · 10.2 KB
/
merra_etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import xarray as xr
import numpy as np
from pathlib import Path
from typing import Optional, Union, Sequence
def binary_round(
ds: Union[xr.Dataset, xr.DataArray, np.ndarray],
*,
binary_bits: Optional[int] = None,
decimal_digits: Optional[int] = None,
) -> Union[xr.Dataset, xr.DataArray, np.ndarray]:
"""Round an array of floats to a specific number of bits. The number of bits can be specified directly, with binary_bits, or calculated (conservatively) from decimal_digits.
Note: input negative bits to round to the left of the floating point.
Motivation is to enable higher compression by removing false precision from data.
Example: binary_round(np.array([1.1234567]), decimal_digits=2) -> np.ndarray(1.125)
Parameters
----------
ds : Union[xr.Dataset, xr.DataArray, np.ndarray]
dataset
binary_bits : Optional[int], optional
Number of bits of precision to keep. Choose this OR decimal_digits. By default None
decimal_digits : Optional[int], optional
Number of decimal digits of precision to keep. Choose this OR binary_bits. By default None
Returns
-------
Union[xr.Dataset, xr.DataArray, np.ndarray]
dataset with rounded values
Raises
------
ValueError
If both binary_bits and decimal_digits are used
ValueError
If neither binary_bits or decimal_digits are used
"""
if binary_bits:
bits = binary_bits
if decimal_digits:
raise ValueError(
f"Only one of binary_bits and decimal_digits can be used. Given {binary_bits} and {decimal_digits}, respectively."
)
elif decimal_digits:
bits = np.ceil(decimal_digits * np.log(10) / np.log(2))
else:
raise ValueError(
f"One of binary_bits and decimal_digits must be an integer. Given {binary_bits} and {decimal_digits}, respectively."
)
multiplier = 2 ** bits
divisor = 2 ** (
-1 * bits
) # faster to multiply by reciprocal than to divide, according to some StackOverflow post. Probably doesn't matter much.
return (ds * multiplier).round() * divisor
def transforms(ds: xr.Dataset) -> xr.Dataset:
"""Column transforms for MERRA-2 data. Note that modifications are performed IN PLACE, but I can't drop variables in place so have to return a new xr.Dataset without those variables.
* Change temperature units from K to degrees C
* Convert wind vector components to magnitude and direction (degrees in [0,360] from North, positive going clockwise)
* Log10 transform precipitation
* Remove unused variables
Parameters
----------
ds : xr.Dataset
MERRA-2 dataset
Returns
-------
xr.Dataset
Modifications are IN PLACE but must return a dataset in order to drop variables
"""
ds.update(ds[["TS", "T10M"]] - 273.15) # convert units K -> C
ds["WS50M"] = np.sqrt(np.square(ds["V50M"]) + np.square(ds["U50M"]))
ds["WDIR50M"] = np.mod(
np.arctan2(ds["U50M"], ds["V50M"]) + 2 * np.pi, 2 * np.pi
) * (
180 / np.pi
) # This gives angle from North, positive going clockwise
ds["PRECTOTCORR"] = np.log10(ds["PRECTOTCORR"] + 2 ** -48)
return ds.drop_vars(["U50M", "V50M", "Z0M"])
def reduce_precision(ds: xr.Dataset, fp16=False) -> None:
"""Remove false precision IN PLACE to facilitate downstream compression. Two methods are conversion to float16 and back (3.3 decimal digits precision) and binary rounding to fixed precision. The appropriate levels were determined in './data/merge and rechunk.ipynb'. **Hardcoded** for current iteration of MERRA-2 data.
Parameters
----------
ds : xr.Dataset
dataset of MERRA-2
fp16 : bool, optional
If True, use fp16 conversion method. If False, use fixed precision rounding method. By default False
"""
ds[["lat", "lon"]].astype(np.float32)
ds["PS"] = binary_round(ds["PS"], decimal_digits=-1).astype(np.int32)
mask = ds["PRECTOTCORR"] <= -14 # assumes log10 applied first!
ds["PRECTOTCORR"] = xr.where(
mask, 0, ds["PRECTOTCORR"]
) # threshold tiny floats to 0
if fp16:
f16 = ["GHLAND", "RHOA", "PRECTOTCORR", "RISFC", "TS", "T10M"]
ds.update(ds[f16].astype(np.float16).astype(np.float32))
for dec, cols in {1: ["WDIR50M"], 3: ["WS50M"]}.items():
ds.update(binary_round(ds[cols], decimal_digits=dec))
else:
prec = {
1: ["GHLAND", "RISFC", "TS", "T10M", "WDIR50M"],
2: ["PRECTOTCORR"],
3: ["RHOA", "WS50M"],
}
for dec, cols in prec.items():
ds.update(binary_round(ds[cols], decimal_digits=dec))
def make_divisions(
ds: xr.Dataset,
*,
points_per_partition: Optional[int] = None,
mb_per_partition: Optional[int] = None,
) -> list:
"""Calculate partition divisions for converting an xr.Dataset to a dask.DataFrame given either points per partition or megabytes per partition. Either option will produce partitions split on grid point boundaries, to ensure load order wont matter in the future.
Parameters
----------
ds : xr.Dataset
dataset with all component arrays having the same time dimension
points_per_partition : Optional[int], optional
Number of grid points to include in each dask.DataFrame partition, by default None
mb_per_partition : Optional[int], optional
Max size in megabytes of each dask.DataFrame partition, by default None
Returns
-------
list
Form is list(0, 1 * step size, 2 * step size, ... , last index value)
Raises
------
ValueError
When both points_per_partition and mb_per_partition are specified (conflict)
ValueError
When neither points_per_partition or mb_per_partition are specified
"""
time = ds.time.size
total_points = len(ds.lat) * len(ds.lon)
if points_per_partition:
if mb_per_partition:
raise ValueError(
f"Choose only one of points_per_partition or mb_ per_partition. Given {points_per_partition} and {mb_per_partition}, respectively"
)
step_size = time * points_per_partition
steps = total_points // points_per_partition
return [step_size * i for i in range(steps)] + [time * total_points - 1]
elif mb_per_partition:
mb_per_point = ds.nbytes / total_points / (2 ** 20)
points = int(mb_per_partition // mb_per_point)
return make_divisions(ds, points_per_partition=points)
else:
raise ValueError(
f"Must choose one of points_per_partition or mb_ per_partition. Given {points_per_partition} and {mb_per_partition}, respectively"
)
def rechunk(ds: xr.Dataset, megabytes_per_chunk: Union[int, float] = 100) -> xr.Dataset:
"""Change chunk size to something more appropriate. Daily netCDFs are read in as all-space, daily time. I want the opposite: all-time, chunked space. Spatial tiling is determined by megabytes_per_chunk.
Note that megabytes_per_chunk is approximate and usually conservative - the number of chunks is conservative but the step size is aggressive, so the errors often but not always offset.
Parameters
----------
ds : xr.Dataset
dataset where all variables have the same time dimension
megabytes_per_chunk : Union[int, float], optional
Target chunk size in MB. Recall that chunking occurs per variable. By default 100
Returns
-------
xr.Dataset
[description]
Raises
------
ValueError
When an individual grid point is larger than the given megabytes_per_chunk
"""
total_points = len(ds.lat) * len(ds.lon)
mb_per_point = ds.nbytes / total_points / 2 ** 20
if mb_per_point > megabytes_per_chunk:
raise ValueError(
f"Each grid point is larger than the given chunk size.\nPoint size: {mb_per_point:.2f}MB\n megabytes_per_chunk: {megabytes_per_chunk:.2f}\nThis algorithm hardcodes a full time slice."
)
points_per_chunk = (
megabytes_per_chunk / mb_per_point * (len(ds.variables) - len(ds.coords))
) # chunks are per-variable. ds.variables includes coords, so they must be removed
if points_per_chunk < total_points:
# this binary tiling algorithm is conservative on number of chunks, so I made step size aggressive to compensate.
log_n_chunks = np.ceil(
np.log2(total_points / points_per_chunk)
) # nearest power of 2, rounded up
log_lat_chunks = log_n_chunks // 2
log_lon_chunks = log_n_chunks - log_lat_chunks
lat_step = np.ceil(len(ds.lat) / (2 ** log_lat_chunks))
lon_step = np.ceil(len(ds.lon) / (2 ** log_lon_chunks))
return ds.chunk(chunks={"lat": lat_step, "lon": lon_step, "time": None})
else:
return ds.chunk(chunks={"lat": None, "lon": None, "time": None})
def merra_nc4_to_parquet(
files_in: Sequence[Path],
dir_out: Path,
precision_reduction: Optional[str] = "round",
max_megabytes_per_file: int = 100,
) -> None:
"""API to convert a folder of daily netCDF MERRA-2 data to columnar parquet files. This method only works for data that fits in memory.
Parameters
----------
files_in : Sequence[Path]
sequence of file paths, such as Path(<directory>).glob(<PATTERN>)
dir_out : Path
directory where parquet files will be written
precision_reduction : Optional[str], optional
One of None, 'round', or 'fp16', by default 'round'
max_megabytes_per_file : int, optional
max uncompressed size of each output parquet file, in megabytes. Actual files will be smaller due to compression. By default 100
Returns
-------
None
"""
ds = xr.open_mfdataset(
files_in,
combine="by_coords",
concat_dim="time",
data_vars="minimal",
coords="minimal",
compat="no_conflicts",
)
ds = rechunk(ds)
ds = transforms(ds)
if precision_reduction is not None:
fp16 = precision_reduction == "fp16"
reduce_precision(ds, fp16=fp16)
divisions = make_divisions(ds, mb_per_partition=max_megabytes_per_file)
ddf = ds.to_dask_dataframe()
ddf.repartition(divisions=divisions).to_parquet(
dir_out, compression="snappy", write_index=False
)