Skip to content

Commit

Permalink
complying with complexity limits for ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
bnb32 committed Jul 2, 2024
1 parent 8e5573f commit ef0b90b
Show file tree
Hide file tree
Showing 15 changed files with 224 additions and 552 deletions.
16 changes: 8 additions & 8 deletions nsrdb/albedo/ims.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class ImsError(Exception):
"""General exception for IMS processing"""


class ImsDataNotFound(ImsError):
class ImsDataNotFoundError(ImsError):
"""
Raised when IMS data is not available on ftp server. This is typically
caused by a missing day that needs to be gap-filled.
Expand Down Expand Up @@ -305,7 +305,7 @@ def get_files(self):
try:
ifa.get_file()
self.data = ifa.data
except ImsDataNotFound:
except ImsDataNotFoundError:
logger.info(
f'Data is missing or bad for {self.date}, '
'attempting to gap fill'
Expand Down Expand Up @@ -363,7 +363,7 @@ def fill_gap(self):
ifa.get_file()
logger.info('Gap-fill data found on disk')
return ifa
except ImsDataNotFound:
except ImsDataNotFoundError:
pass

ifa = self._find_closest_day()
Expand Down Expand Up @@ -397,7 +397,7 @@ def _find_closest_day(self):

try:
ifa.get_file()
except ImsDataNotFound:
except ImsDataNotFoundError:
continue

logger.info(
Expand Down Expand Up @@ -436,7 +436,7 @@ class ImsFileAcquisition:
disk first. If not on disk the data is downloaded.
Files are acquired and loaded by calling self.get_file() after class is
initialized. ImsDataNotFound is raised if there is any issue obtaining
initialized. ImsDataNotFoundError is raised if there is any issue obtaining
or loading data.
It should be noted that for dates on and after 2014, 336, (Ver 1.3) the
Expand Down Expand Up @@ -556,7 +556,7 @@ def get_file(self):
)
else:
if self._gap_fill:
raise ImsDataNotFound(
raise ImsDataNotFoundError(
f'Gap-fill file {self._pfilename} not ' 'found on disk'
)

Expand All @@ -565,7 +565,7 @@ def get_file(self):
' download'
)
if not self._check_ftp_for_data():
raise ImsDataNotFound(
raise ImsDataNotFoundError(
f'{self._pfilename}.gz not found on ' f'{FTP_SERVER}'
)
self._download_data()
Expand Down Expand Up @@ -649,7 +649,7 @@ def _load_data(self):
f'{length} but is {len(raw)}.'
)
logger.warning(msg)
raise ImsDataNotFound(msg)
raise ImsDataNotFoundError(msg)

# Changed unpacked snow/ice values to match packed format
raw = np.array(raw)
Expand Down
131 changes: 74 additions & 57 deletions nsrdb/all_sky/all_sky.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,72 @@ def all_sky_h5(f_source, rows=slice(None), cols=slice(None), disc_on=False):
return out


def _all_sky_h5_parallel(
f_source,
rows=slice(None),
cols=slice(None),
col_chunk=10,
max_workers=None,
disc_on=False,
):
out_shape = (rows.stop - rows.start, cols.stop - cols.start)
c_range = range(cols.start, cols.stop, col_chunk)
c_slices_all = {}
for c in c_range:
c_slice = slice(c, np.min((c + col_chunk, cols.stop)))
c_slices_all[c] = c_slice

out = {}
completed = 0

logger.info('Running all-sky in parallel on %s workers.', max_workers)

loggers = ['farms', 'nsrdb', 'rest2', 'rex']
with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe:
futures = {
exe.submit(
all_sky_h5,
f_source,
rows=rows,
disc_on=disc_on,
cols=c_slices_all[c],
): c
for c in c_range
}

for future in as_completed(futures):
c = futures[future]
c_slice = c_slices_all[c]
all_sky_out = future.result()

for var, arr in all_sky_out.items():
if var not in out:
logger.info(
'Initializing output array for "%s" with shape %s '
'and dtype %s.',
var,
out_shape,
arr.dtype,
)
out[var] = np.ndarray(out_shape, dtype=arr.dtype)
out[var][:, c_slice] = arr

completed += 1
mem = psutil.virtual_memory()

if completed % 10 == 0:
logger.info(
'All-sky futures completed: {} out of {}. Current memory '
'usage is {:.3f} GB out of {:.3f} GB total.'.format(
completed,
len(futures),
mem.used / 1e9,
mem.total / 1e9,
)
)
return out


def all_sky_h5_parallel(
f_source,
rows=slice(None),
Expand Down Expand Up @@ -436,60 +502,11 @@ def all_sky_h5_parallel(
logger.info('Running all-sky for rows: %s', rows)
logger.info('Running all-sky for cols: %s', cols)

out_shape = (rows.stop - rows.start, cols.stop - cols.start)
c_range = range(cols.start, cols.stop, col_chunk)
c_slices_all = {}
for c in c_range:
c_slice = slice(c, np.min((c + col_chunk, cols.stop)))
c_slices_all[c] = c_slice

out = {}
completed = 0

logger.info('Running all-sky in parallel on %s workers.', max_workers)

loggers = ['farms', 'nsrdb', 'rest2', 'rex']
with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe:
futures = {
exe.submit(
all_sky_h5,
f_source,
rows=rows,
disc_on=disc_on,
cols=c_slices_all[c],
): c
for c in c_range
}

for future in as_completed(futures):
c = futures[future]
c_slice = c_slices_all[c]
all_sky_out = future.result()

for var, arr in all_sky_out.items():
if var not in out:
logger.info(
'Initializing output array for "%s" with shape %s '
'and dtype %s.',
var,
out_shape,
arr.dtype,
)
out[var] = np.ndarray(out_shape, dtype=arr.dtype)
out[var][:, c_slice] = arr

completed += 1
mem = psutil.virtual_memory()

if completed % 10 == 0:
logger.info(
'All-sky futures completed: {} out of {}. Current memory '
'usage is {:.3f} GB out of {:.3f} GB total.'.format(
completed,
len(futures),
mem.used / 1e9,
mem.total / 1e9,
)
)

return out
return _all_sky_h5_parallel(
f_source=f_source,
rows=rows,
cols=cols,
col_chunk=col_chunk,
max_workers=max_workers,
disc_on=disc_on,
)
2 changes: 1 addition & 1 deletion nsrdb/blend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def main(
python -m nsrdb.blend.cli -n "blend5" -m $META -od "./" -ed $EDIR -wd $WDIR -t "irradiance" -mc $MAPCOL -ls -105.0 -cs 100000 -ld "./logs/" slurm -a "pxs" -wt 48.0 -l "--qos=normal" -mem "83" -sout "./logs/"
python -m nsrdb.blend.cli -n "blend6" -m $META -od "./" -ed $EDIR -wd $WDIR -t "pv" -mc $MAPCOL -ls -105.0 -cs 100000 -ld "./logs/" slurm -a "pxs" -wt 48.0 -l "--qos=normal" -mem "83" -sout "./logs/"
```
"""
""" # noqa: E501

if log_dir is None:
log_dir = os.path.join(out_dir, 'logs/')
Expand Down
19 changes: 12 additions & 7 deletions nsrdb/data_model/clouds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging
import os
import re
from warnings import warn
from typing import ClassVar
from warnings import warn

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -469,9 +469,13 @@ def _clean_dup_coords(grid):

dup_mask = grid.duplicated() & ~grid['latitude'].isna()
if any(dup_mask):
rand_mult = np.random.Generator(0.99, 1.01, dup_mask.sum())
rand_mult = np.random.default_rng().uniform(
0.99, 1.01, dup_mask.sum()
)
grid.loc[dup_mask, 'latitude'] *= rand_mult
rand_mult = np.random.Generator(0.99, 1.01, dup_mask.sum())
rand_mult = np.random.default_rng().uniform(
0.99, 1.01, dup_mask.sum()
)
grid.loc[dup_mask, 'longitude'] *= rand_mult

wmsg = (
Expand Down Expand Up @@ -1354,8 +1358,9 @@ def _check_freq(self):
logger.warning(w)
warn(w)
else:
m = 'CloudVar handler has a frequency of "{}" for pattern: {}'.format(
self.freq, self.pattern
m = (
f'CloudVar handler has a frequency of "{self.freq}" for '
f'pattern: {self.pattern}'
)
logger.debug(m)

Expand Down Expand Up @@ -1518,8 +1523,8 @@ def _remove_bad_files(flist):

for fp in flist:
if NSRDBfs(fp).size() < 1e6:
msg = 'Cloud data source file is less than 1MB, skipping: {}'.format(
fp
msg = (
f'Cloud data source file is less than 1MB, skipping: {fp}'
)
warn(msg)
logger.warning(msg)
Expand Down
Loading

0 comments on commit ef0b90b

Please sign in to comment.