19
19
from aqua .util import create_folder , generate_random_string
20
20
from aqua .util import dump_yaml , load_yaml
21
21
from aqua .util import ConfigPath , file_is_complete
22
- from aqua .lra_generator .lra_util import move_tmp_files , replace_intake_vars
23
-
24
-
25
- #from aqua.lra_generator.lra_util import check_correct_ifs_fluxes
22
+ from aqua .util import create_zarr_reference
23
+ from aqua .lra_generator .lra_util import move_tmp_files , list_lra_files_complete , replace_intake_vars
26
24
27
25
28
26
class LRAgenerator ():
@@ -94,47 +92,51 @@ def __init__(self,
94
92
self .logger .warning ('IMPORTANT: no file will be created, this is a dry run' )
95
93
96
94
self .nproc = int (nproc )
97
- self .tmpdir = tmpdir
95
+ if tmpdir is None :
96
+ self .logger .warning ('No tmpdir specifield, will use outdir' )
97
+ self .tmpdir = os .path .join (outdir , 'tmp' )
98
+ else :
99
+ self .tmpdir = tmpdir
100
+
98
101
if self .dask :
99
102
self .logger .info ('Running dask.distributed with %s workers' , self .nproc )
100
- if not self .tmpdir :
101
- raise KeyError ('Please specify tmpdir for dask.distributed.' )
102
103
103
104
self .tmpdir = os .path .join (self .tmpdir , 'LRA_' +
104
- generate_random_string (10 ))
105
+ generate_random_string (10 ))
105
106
106
- if model :
107
+ # safechecks
108
+ if model is not None :
107
109
self .model = model
108
110
else :
109
111
raise KeyError ('Please specify model.' )
110
112
111
- if exp :
113
+ if exp is not None :
112
114
self .exp = exp
113
115
else :
114
116
raise KeyError ('Please specify experiment.' )
115
117
116
- if source :
118
+ if source is not None :
117
119
self .source = source
118
120
else :
119
121
raise KeyError ('Please specify source.' )
120
122
123
+ if var is not None :
124
+ self .var = var
125
+ else :
126
+ raise KeyError ('Please specify variable string or list.' )
127
+
128
+ if resolution is not None :
129
+ self .resolution = resolution
130
+ else :
131
+ raise KeyError ('Please specify resolution.' )
132
+ self .logger .info ('Variable(s) to be processed: %s' , self .var )
133
+
121
134
self .kwargs = kwargs
122
135
123
136
Configurer = ConfigPath (configdir = configdir )
124
137
self .configdir = Configurer .configdir
125
138
self .catalog = catalog
126
139
127
- # Initialize variable(s)
128
- self .var = var
129
-
130
- if not self .var :
131
- raise KeyError ('Please specify variable string or list.' )
132
- self .logger .info ('Variable(s) to be processed: %s' , self .var )
133
-
134
- self .resolution = resolution
135
- if not self .resolution :
136
- raise KeyError ('Please specify resolution.' )
137
-
138
140
self .frequency = frequency
139
141
if not self .frequency :
140
142
self .logger .info ('Frequency not specified, no time averagin will be performed.' )
@@ -144,7 +146,7 @@ def __init__(self,
144
146
'units' : 'days since 1850-01-01 00:00:00' ,
145
147
'calendar' : 'standard' ,
146
148
'dtype' : 'float64' }
147
-
149
+
148
150
self .var_encoding = {
149
151
'dtype' : 'float64' ,
150
152
'zlib' : True ,
@@ -162,7 +164,10 @@ def __init__(self,
162
164
self .last_record = None
163
165
self .check = False
164
166
165
- # Create LRA folder
167
+ # Create LRA folders
168
+ if outdir is None :
169
+ raise KeyError ('Please specify outdir.' )
170
+
166
171
self .outdir = os .path .join (outdir , self .model , self .exp , self .resolution )
167
172
168
173
if self .frequency :
@@ -205,7 +210,7 @@ def retrieve(self):
205
210
206
211
self .logger .info ('Retrieving data...' )
207
212
self .data = self .reader .retrieve (var = self .var )
208
-
213
+
209
214
self .logger .debug (self .data )
210
215
211
216
def generate_lra (self ):
@@ -223,10 +228,10 @@ def generate_lra(self):
223
228
224
229
else : # Only one variable
225
230
self ._write_var (self .var )
226
-
231
+
227
232
self .logger .info ('Move tmp files to output directory' )
228
233
move_tmp_files (self .tmpdir , self .outdir )
229
-
234
+
230
235
# Cleaning
231
236
self .data .close ()
232
237
self ._close_dask ()
@@ -282,6 +287,97 @@ def create_catalog_entry(self):
282
287
# dump the update file
283
288
dump_yaml (outfile = catalogfile , cfg = cat_file )
284
289
290
+ def create_zarr_entry (self , verify = True ):
291
+ """
292
+ Create a Zarr entry in the catalog for the LRA
293
+
294
+ Args:
295
+ verify: open the LRA source and verify it can be read by the reader
296
+ """
297
+
298
+ entry_name = f'lra-{ self .resolution } -{ self .frequency } -zarr'
299
+ full_dict , partial_dict = list_lra_files_complete (self .outdir )
300
+ # full_dict, partial_dict = list_lra_files_vars(self.outdir)
301
+ self .logger .info ('Creating zarr files for %s %s %s' , self .model , self .exp , entry_name )
302
+
303
+ # extra zarr only directory
304
+ zarrdir = os .path .join (self .outdir , 'zarr' )
305
+ create_folder (zarrdir )
306
+
307
+ # this dictionary based structure is an overkill but guarantee flexibility
308
+ urlpath = []
309
+ for key , value in full_dict .items ():
310
+ jsonfile = os .path .join (zarrdir , f'lra-yearly-{ key } .json' )
311
+ self .logger .debug ('Creating zarr files for full files %s' , key )
312
+ if value :
313
+ jsonfile = create_zarr_reference (value , jsonfile , loglevel = self .loglevel )
314
+ if jsonfile is not None :
315
+ urlpath = urlpath + [f'reference::{ jsonfile } ' ]
316
+
317
+ for key , value in partial_dict .items ():
318
+ jsonfile = os .path .join (zarrdir , f'lra-monthly-{ key } .json' )
319
+ self .logger .debug ('Creating zarr files for partial files %s' , key )
320
+ if value :
321
+ jsonfile = create_zarr_reference (value , jsonfile , loglevel = self .loglevel )
322
+ if jsonfile is not None :
323
+ urlpath = urlpath + [f'reference::{ jsonfile } ' ]
324
+
325
+ if not urlpath :
326
+ raise FileNotFoundError ('No files found to create zarr reference' )
327
+
328
+ # apply intake replacement: works on string need to loop on the list
329
+ for index , value in enumerate (urlpath ):
330
+ urlpath [index ] = replace_intake_vars (catalog = self .catalog , path = value )
331
+
332
+ # load, add the block and close
333
+ catalogfile = os .path .join (self .configdir , 'catalogs' , self .catalog ,
334
+ 'catalog' , self .model , self .exp + '.yaml' )
335
+ cat_file = load_yaml (catalogfile )
336
+
337
+ # if entry exists
338
+ if entry_name in cat_file ['sources' ]:
339
+
340
+ self .logger .info ('Catalog entry for %s %s %s exists, updating the urlpath only...' ,
341
+ self .model , self .exp , entry_name )
342
+ cat_file ['sources' ][entry_name ]['args' ]['urlpath' ] = urlpath
343
+
344
+ else :
345
+ self .logger .info ('Creating zarr catalog entry %s %s %s' , self .model , self .exp , entry_name )
346
+
347
+ # define the block to be uploaded into the catalog
348
+ block_cat = {
349
+ 'driver' : 'zarr' ,
350
+ 'description' : f'LRA data { self .frequency } at { self .resolution } reference on zarr' ,
351
+ 'args' : {
352
+ 'consolidated' : False ,
353
+ 'combine' : 'by_coords' ,
354
+ 'urlpath' : urlpath
355
+ },
356
+ 'metadata' : {
357
+ 'source_grid_name' : 'lon-lat' ,
358
+ },
359
+ 'fixer_name' : False
360
+ }
361
+ cat_file ['sources' ][entry_name ] = block_cat
362
+
363
+ dump_yaml (outfile = catalogfile , cfg = cat_file )
364
+
365
+ # verify the zarr entry makes sense
366
+ if verify :
367
+ self .logger .info ('Verifying that zarr entry can be loaded...' )
368
+ try :
369
+ reader = Reader (model = self .model , exp = self .exp , source = 'lra-r100-monthly-zarr' )
370
+ data = reader .retrieve ()
371
+ self .logger .info ('Zarr entry successfully created!!!' )
372
+ except (KeyError , ValueError ) as e :
373
+ self .logger .error ('Cannot load zarr LRA with error --> %s' , e )
374
+ self .logger .error ('Zarr source is not accessible by the Reader likely due to irregular amount of NetCDF file' )
375
+ self .logger .error ('To avoid issues in the catalog, the entry will be removed' )
376
+ self .logger .error ('In case you want to keep it, please run with verify=False' )
377
+ cat_file = load_yaml (catalogfile )
378
+ del cat_file ['sources' ][entry_name ]
379
+ dump_yaml (outfile = catalogfile , cfg = cat_file )
380
+
285
381
def _set_dask (self ):
286
382
"""
287
383
Set up dask cluster
@@ -319,15 +415,15 @@ def _concat_var_year(self, var, year):
319
415
from the same year
320
416
"""
321
417
322
- #infiles = os.path.join(self.outdir,
418
+ # infiles = os.path.join(self.outdir,
323
419
# f'{var}_{self.exp}_{self.resolution}_{self.frequency}_{year}??.nc')
324
420
infiles = self .get_filename (var , year , month = '??' )
325
421
if len (glob .glob (infiles )) == 12 :
326
422
xfield = xr .open_mfdataset (infiles )
327
423
self .logger .info ('Creating a single file for %s, year %s...' , var , str (year ))
328
424
outfile = self .get_filename (var , year )
329
- #outfile = os.path.join(self.tmpdir,
330
- # f'{var}_{self.exp}_{self.resolution}_{self.frequency}_{year}.nc')
425
+ # outfile = os.path.join(self.tmpdir,
426
+ # f'{var}_{self.exp}_{self.resolution}_{self.frequency}_{year}.nc')
331
427
# clean older file
332
428
if os .path .exists (outfile ):
333
429
os .remove (outfile )
@@ -338,7 +434,6 @@ def _concat_var_year(self, var, year):
338
434
self .logger .info ('Cleaning %s...' , infile )
339
435
os .remove (infile )
340
436
341
-
342
437
def get_filename (self , var , year = None , month = None , tmp = False ):
343
438
"""Create output filenames"""
344
439
@@ -437,7 +532,7 @@ def _remove_regridded(self, data):
437
532
# continue
438
533
# else:
439
534
# self.logger.warning('Monthly file %s already exists, overwriting as requested...', outfile)
440
-
535
+
441
536
# # real writing
442
537
# if self.definitive:
443
538
# self.write_chunk(temp_data, outfile)
@@ -480,7 +575,7 @@ def _write_var_catalog(self, var):
480
575
for year in years :
481
576
482
577
self .logger .info ('Processing year %s...' , str (year ))
483
- yearfile = self .get_filename (var , year = year )
578
+ yearfile = self .get_filename (var , year = year )
484
579
485
580
# checking if file is there and is complete
486
581
filecheck = file_is_complete (yearfile , loglevel = self .loglevel )
@@ -498,7 +593,7 @@ def _write_var_catalog(self, var):
498
593
months = [months [0 ]]
499
594
for month in months :
500
595
self .logger .info ('Processing month %s...' , str (month ))
501
- outfile = self .get_filename (var , year = year , month = month )
596
+ outfile = self .get_filename (var , year = year , month = month )
502
597
503
598
# checking if file is there and is complete
504
599
filecheck = file_is_complete (outfile , loglevel = self .loglevel )
@@ -511,12 +606,9 @@ def _write_var_catalog(self, var):
511
606
512
607
month_data = year_data .sel (time = year_data .time .dt .month == month )
513
608
514
- #self.logger.debug(month_data.mean().values)
515
- #self.logger.debug(month_data)
516
-
517
609
# real writing
518
610
if self .definitive :
519
- tmpfile = self .get_filename (var , year = year , month = month , tmp = True )
611
+ tmpfile = self .get_filename (var , year = year , month = month , tmp = True )
520
612
schunk = time ()
521
613
self .write_chunk (month_data , tmpfile )
522
614
tchunk = time () - schunk
@@ -537,7 +629,7 @@ def _write_var_catalog(self, var):
537
629
def write_chunk (self , data , outfile ):
538
630
"""Write a single chunk of data - Xarray Dataset - to a specific file
539
631
using dask if required and monitoring the progress"""
540
-
632
+
541
633
# update data attributes for history
542
634
if self .frequency :
543
635
log_history (data , f'regridded from { self .reader .src_grid_name } to { self .resolution } and from frequency { self .reader .orig_freq } to { self .frequency } through LRA generator' )
@@ -576,13 +668,10 @@ def write_chunk(self, data, outfile):
576
668
avg_mem = np .mean (array_data [:, 1 ])/ 1e9
577
669
max_mem = np .max (array_data [:, 1 ])/ 1e9
578
670
self .logger .info ('Avg memory used: %.2f GiB, Peak memory used: %.2f GiB' , avg_mem , max_mem )
579
-
671
+
580
672
else :
581
673
with ProgressBar ():
582
674
write_job .compute ()
583
675
584
676
del write_job
585
677
self .logger .info ('Writing file %s successfull!' , outfile )
586
-
587
-
588
-
0 commit comments