-
Notifications
You must be signed in to change notification settings - Fork 0
/
lambda_handler.py
executable file
·489 lines (417 loc) · 13.5 KB
/
lambda_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
"""Lambda function handler"""
import contextlib
import json
import os
import sys
import tempfile
import time
from datetime import datetime
import h5py
import numpy as np
import pandas as pd
from cloud_fs import FileSystem
from rex import init_logger, safe_json_load
from nsrdb import NSRDB
from nsrdb.data_model.clouds import CloudVar
class LambdaHandler(dict):
"""Lambda Handler class"""
def __init__(self, event):
"""
Parameters
----------
event : dict
Event or test dictionary
"""
self.update(
{k.lower(): self._parse_env_var(v) for k, v in os.environ.items()}
)
if isinstance(event, str):
event = safe_json_load(event)
self.update(event)
rfd = self.get('run_full_day', False)
self._var_meta, self._timestep = self.load_var_meta(
self['var_meta'], self.day, run_full_day=rfd
)
self._fpath_out = None
self._data_model = None
log_level = 'DEBUG' if self.get('verbose', False) else 'INFO'
self.logger = init_logger('nsrdb', log_level=log_level)
self.logger.propagate = False
@property
def day(self):
"""
Date (day) to run NSRDB for, in the format YYYYMMDD
Returns
-------
str
"""
day = self.get('date', None)
if not day:
day = datetime.utcnow().strftime('%Y%m%d')
return day
@property
def year(self):
"""
Year of the day being run through NSRDB
Returns
-------
str
"""
return self.day[:4]
@property
def grid(self):
"""
Path to .csv with NSRDB grid to compute NSRDB values one
Returns
-------
str
"""
return self['grid']
@property
def fpath_out(self):
"""
Final file path of output .h5 file, typically on S3
Returns
-------
str
"""
if self._fpath_out is None:
out_dir = self['out_dir']
if not out_dir.endswith(self.year):
out_dir = os.path.join(out_dir, self.year)
fname = '{}-{}.h5'.format(self['file_prefix'], self.day)
self._fpath_out = os.path.join(out_dir, fname)
return self._fpath_out
@property
def temp_dir(self):
"""
Path to root for temporary directory that .h5 file will be written
to before being copied to self.fpath_out
Returns
-------
str
"""
temp_dir = self.get('temp_dir', '/tmp')
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
return temp_dir
@property
def out_dir(self):
"""
Final output directory, typically on S3
Returns
-------
str
"""
return os.path.dirname(self.fpath_out)
@property
def fname(self):
"""
Output .h5 file name
Returns
-------
str
"""
return os.path.basename(self.fpath_out)
@property
def var_meta(self):
"""
DataFrame with variable meta data needed to run NSRDB
Returns
-------
pandas.DataFrame
"""
return self._var_meta
@property
def timestep(self):
"""
Numerical timestep of data to update in self.fpath_out. The actual
timestamp being updated is data_model.nsrdb_ti[timestamp] or
time_index[timestep] where time_index is pulled from self.fpath_out
If None the entire day will be run and dumped to .h5.
Returns
-------
int
"""
return self._timestep
@property
def factory_kwargs(self):
"""
Dictionary of factory kwargs to pass to NSRDB. These will override
values in self.var_meta
Returns
-------
dict
"""
factory_kwargs = {
'air_temperature': {'handler': 'GfsVar'},
'alpha': {'handler': 'NrelVar'},
'aod': {'handler': 'NrelVar'},
'dew_point': {'handler': 'GfsDewPoint'},
'ozone': {'handler': 'GfsVar'},
'relative_humidity': {'handler': 'GfsVar'},
'ssa': {'handler': 'NrelVar'},
'surface_pressure': {'handler': 'GfsVar'},
'total_precipitable_water': {'handler': 'GfsVar'},
'wind_direction': {'handler': 'GfsVar'},
'wind_speed': {'handler': 'GfsVar'},
}
factory_kwargs = self.get('factory_kwargs', factory_kwargs)
if isinstance(factory_kwargs, str):
factory_kwargs = json.loads(factory_kwargs)
return factory_kwargs
@property
def freq(self):
"""
Temporal frequency to run the NSRDB at, default is '5min'
Returns
-------
str
"""
return self.get('freq', '5min')
@property
def data_model(self):
"""
Completed NSRDB data model
Returns
-------
NSRDB.DataModel
"""
if self._data_model is None:
self._data_model = NSRDB.run_full(
self.day,
self.grid,
self.freq,
var_meta=self.var_meta,
factory_kwargs=self.factory_kwargs,
fill_all=self.get('fill_all', False),
low_mem=self.get('low_mem', False),
max_workers=self.get('max_workers', 1),
log_level=None,
)
return self._data_model
@staticmethod
def _parse_env_var(v):
"""
Convert ENV Var type if needed
Parameters
----------
v : str
ENV variable value as a string
Returns
-------
v : obj
ENV variable value converted to proper type
"""
with contextlib.suppress(json.JSONDecodeError):
v = json.loads(v)
return v
@staticmethod
def load_var_meta(var_meta_path, date, run_full_day=False):
"""
Load variable meta and update cloud variable pattern
Parameters
----------
var_meta_path : str
Path to variable meta .csv file
date : str
Date that NSRDB will be run for, used to create pattern
run_full_day : bool, optional
Flag indicating if the entire day is going to be run or if the most
recent file should be run, by default False
Returns
-------
var_meta : pandas.DataFrame
Variable meta table with cloud variables pattern updated
timestep : int
Timestep of newest file to run, None if run_full_day is True
"""
var_meta = pd.read_csv(var_meta_path)
date = NSRDB.to_datetime(date)
year = date.strftime('%Y')
cloud_vars = var_meta['data_source'] == 'UW-GOES'
var_meta.loc[cloud_vars, 'pattern'] = (
var_meta.loc[cloud_vars, 'source_directory']
.apply(lambda d: os.path.join(d, year, '{doy}', '*.nc'))
.values
)
if not run_full_day:
name = var_meta.loc[cloud_vars, 'var'].values[0]
cloud_files = CloudVar(name, var_meta, date).file_df
timestep = np.where(~cloud_files['flist'].isna())[0].max()
var_meta.loc[cloud_vars, 'pattern'] = cloud_files.iloc[
timestep
].values[0]
else:
timestep = None
return var_meta, timestep
@staticmethod
def update_timestep(out_fpath, data_model, timestep):
"""
Update the given timestep in out_fpath with data in data_model
Parameters
----------
out_fpath : str
Path to output .h5 file to update
data_model : nsrdb.DataModel
NSRDB DataModel with computed data for given timestep
timestep : int
Position of timestep to update
"""
with h5py.File(out_fpath, mode='a') as f:
dump_vars = [
v for v in f if v not in ['time_index', 'meta', 'coordinates']
]
for v in dump_vars:
ds = f[v]
ds[timestep] = data_model[v][timestep]
def get_out_vars(self, data_model):
"""
Determine which variable to dump to .h5
Parameters
----------
data_model : NSRDB.DataModel
Completed NSRDB data model
Returns
-------
out_vars :list
List of NSRDB variables to dump to .h5
"""
if not self.get('debug_dsets', False):
out_vars = [
'ghi',
'dni',
'dhi',
'clearsky_ghi',
'clearsky_dni',
'clearsky_dhi',
'fill_flag',
]
else:
out_vars = [
d
for d in list(data_model.processed_data.keys())
if d not in ('time_index', 'meta', 'flag')
]
return out_vars
def dump_to_h5(self, data_model):
"""
Dump NSRDB data to .h5 file. Depending on input parameters the
following can occur:
1) Run the NSRDB for the full day and dump all data to .h5, this will
include gap filled data using MlClouds.
2) Update a single timestep of NSRDB data corresponding to the most
recently created CLAVR-X file. This will ignore gap-filled data.
Parameters
----------
data_model : NSRDB.DataModel
Completed NSRDB data model
"""
with tempfile.TemporaryDirectory(
prefix=f'NSRDB_{self.day}_', dir=self.temp_dir
) as temp_dir:
local_out = os.path.join(temp_dir, self.fname)
out_vars = self.get_out_vars(data_model)
self.logger.info(
'Dumping data for {} to {}'.format(out_vars, local_out)
)
if not self.get('run_full_day', False):
if FileSystem(self.fpath_out).exists():
self.logger.debug(
'Copying {} to {} to fill in newest '
'timestep'.format(self.fpath_out, local_out)
)
FileSystem.copy(self.fpath_out, local_out)
else:
self.logger.debug('Initializing {}'.format(local_out))
nsrdb = NSRDB(
temp_dir,
self.year,
self.grid,
freq=self.freq,
var_meta=self.var_meta,
make_out_dirs=False,
)
nsrdb._init_output_h5(
local_out,
out_vars,
data_model.nsrdb_ti,
data_model.nsrdb_grid,
)
self.logger.debug(
'Updating varible data in {} at timestep {}'.format(
local_out, self.timestep
)
)
self.update_timestep(local_out, data_model, self.timestep)
else:
self.logger.debug(
'Dumping the entire days worth of data for '
'{} in {}'.format(out_vars, local_out)
)
for v in out_vars:
try:
data_model.dump(v, local_out, None, mode='a')
except Exception as e:
msg = f'Could not write "{v}" to disk, got error: {e}'
self.logger.warning(msg)
FileSystem.copy(local_out, self.fpath_out)
@classmethod
def run(cls, event, context=None):
"""
Run NSRDB from given event and update .h5 file on S3
Parameters
----------
event : dict
The event dict that contains the parameters sent when the function
is invoked.
context : dict, optional
The context in which the function is called.
"""
nsrdb = cls(event)
nsrdb.logger.debug(f'event: {event}')
nsrdb.logger.debug(f'context: {context}')
var_meta = nsrdb['var_meta']
nsrdb.logger.debug(
f'NSRDB inputs:'
f'\nday = {nsrdb.day}'
f'\ngrid = {nsrdb.grid}'
f'\nfreq = {nsrdb.freq}'
f'\nvar_meta = {var_meta}'
f'\nfactory_kwargs = {nsrdb.factory_kwargs}'
)
try:
nsrdb.dump_to_h5(nsrdb.data_model)
except Exception:
nsrdb.logger.exception('Failed to run NSRDB!')
raise
success = {
'statusCode': 200,
'body': json.dumps(
'NSRDB ran successfully and '
f'create/updated {nsrdb.fpath_out}'
),
}
return success
def handler(event, context):
"""
Wrapper for NSRDB to allow AWS Lambda invocation
Parameters
----------
event : dict
The event dict that contains the parameters sent when the function
is invoked.
context : dict
The context in which the function is called.
"""
return LambdaHandler.run(event, context=context)
if __name__ == '__main__':
if len(sys.argv) > 1:
event = safe_json_load(sys.argv[1])
ts = time.time()
LambdaHandler.run(event)
print(
'NSRDB lambda runtime: {:.4f} minutes'.format(
(time.time() - ts) / 60
)
)