Skip to content

Commit

Permalink
Merge pull request #96 from kirk0830/spillapi-refactor-3
Browse files Browse the repository at this point in the history
Refactor: add unittest for spillage/api.py
  • Loading branch information
kirk0830 authored Sep 29, 2024
2 parents 173b64c + 25235f0 commit e08ab10
Showing 1 changed file with 119 additions and 64 deletions.
183 changes: 119 additions & 64 deletions SIAB/spillage/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _coef_gen(rcut: float, ecut: float, lmax: int, value: str = "eye"):
Returns
-------
list: the value requested
list: the coefficients of the orbitals, indexed by [type][l][zeta][q]
"""

if not isinstance(ecut, (int, float)):
Expand Down Expand Up @@ -63,41 +63,46 @@ def _coef_restart(fcoef: str):
"""
return read_param(fcoef)

def _coef_opt_env_init(orbparam: list, folders: list):
"""the length of this function will continuously increase once there are new
feature requests
def _band_indexing(nbands, indexes, folders):
"""calculate the band indexes for orbital optimization.
Parameters
----------
orbparam: list
params set for each orbital
folders: list
the folders where the ABACUS run information are stored
nbands: int|list[int]
the number of bands for each geometry
indexes: list[int]
the indexes of the geometries
folders: list[list[str]]
the folders where the ABACUS run information are stored. The first level of list
is the geometry like dimer, trimer, etc, the second level is the folders of the
deformation of the geometry, such as stretching, bending, etc.
Returns
-------
list[list[list[int]]]: the band indexes for each shell of orbitals
list: the band indexes for each geometry
Notes
-----
This function now handles two circumstances:
1. nbands is a list.
this means there are multiple geometries were set as reference structures,
and the elem of list specifies the number of bands for each geometry.
2. nbands is an scalar.
"""
# Request: taking different geometry configurations into account
ibands = [[] for _ in range(len(orbparam))]
for iorb, orb in enumerate(orbparam):
if isinstance(orb['nbands_ref'], list):
ibands[iorb] = flatten([[range(n)]*len(folders[find]) \
for find, n in zip(orb['folder'], orb['nbands_ref'])])
else:
ibands[iorb] = orb['nbands_ref']

return ibands

def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
if isinstance(nbands, list):
return flatten([[range(n)]*len(folders[i]) for i, n in zip(indexes, nbands)])
else:
return nbands

def _coef_opt_jy(rcut, orbparams, folders, options, nthreads, spill_coefs = None):
"""for fit_basis jy case, optimize Spillage function to get contraction coefficients of jy for one single rcut value
Parameters
----------
rcut: float
the cutoff radius
siab_settings: dict
the settings for SIAB optimization
orbparams: list
list of settings for generation of each orbital
folders: list[list[str]]
the folders where the ABACUS run information are stored. The first level of list
is the geometry like dimer, trimer, etc, the second level is the folders of the
Expand All @@ -113,8 +118,8 @@ def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
"""
minimizer = Spillage_jy()
print(f"ORBGEN: Optimizing orbitals for rcut = {rcut} au", flush = True)

orbparams = siab_settings['orbitals']
if spill_coefs:
print("ORBGEN: For jy, spill_coefs is deprecated.", flush = True)

configs = [folders[indf] for orb in orbparams for indf in orb['folder']]
configs = list(set([folder for f in configs for folder in f]))
Expand All @@ -126,11 +131,13 @@ def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
for folder in configs:
minimizer.config_add(os.path.join(folder, f"OUT.{os.path.basename(folder)}"))


# infer nzeta if `zeta_notation` specified as `auto`, this cause the `nzeta` to be `auto`
nbands_ref = [[orb['nbands_ref']] if not isinstance(orb['nbands_ref'], list) else orb['nbands_ref']\
for orb in orbparams]
nbands_ref = [[nbands_ref[iorb]*len(folders[find]) for find in orb['folder']]
nbands_ref = [[nbands_ref[iorb]*len(folders[i]) for i in orb['folder']]
for iorb, orb in enumerate(orbparams)]

nzeta = [orb['nzeta'] if orb['nzeta'] != "auto" else\
_nzeta_mean_conf(flatten(nbands_ref[iorb]), flatten([folders[i] for i in orb['folder']]))\
for iorb, orb in enumerate(orbparams)]
Expand All @@ -141,9 +148,9 @@ def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
elem = flatten(folders)[0].split("-")[0]
initdir = "-".join([elem, "monomer", f"{rcut}au"])
initdir = os.path.join(initdir, f"OUT.{os.path.basename(initdir)}")
guess = _initguess(nzeta, initdir, True, True)
guess = _make_guess(nzeta, initdir, True, True)

ibands = _coef_opt_env_init(siab_settings['orbitals'], folders)
ibands = [_band_indexing(orb['nbands_ref'], orb['folder'], folders) for orb in orbparams]
deps = [orb['nzeta_from'] for orb in orbparams]

return _do_onion_opt(minimizer,
Expand All @@ -155,15 +162,15 @@ def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
options,
guess)

def _coef_opt_pw(rcut, siab_settings, folders, options, nthreads):
def _coef_opt_pw(rcut, orbparams, folders, options, nthreads, spill_coefs = None):
"""for fit_basis pw case, optimize Spillage function to get contraction coefficients of pw for one single rcut value
Parameters
----------
rcut: float
the cutoff radius
siab_settings: dict
the settings for SIAB optimization
orbparams: list
list of settings for generation of each orbital
folders: list[list[str]]
the folders where the ABACUS run information are stored. The first level of list
is the geometry like dimer, trimer, etc, the second level is the folders of the
Expand All @@ -181,7 +188,7 @@ def _coef_opt_pw(rcut, siab_settings, folders, options, nthreads):
minimizer = Spillage_pw()
print(f"ORBGEN: Optimizing orbitals for rcut = {rcut} au", flush = True)

orbparams = siab_settings['orbitals']
spill_coefs = [0.0, 1.0] if spill_coefs is None else spill_coefs

configs = [folders[indf] for orb in orbparams for indf in orb['folder']]
configs = list(set([folder for f in configs for folder in f]))
Expand All @@ -197,36 +204,29 @@ def _coef_opt_pw(rcut, siab_settings, folders, options, nthreads):
assert ov['rcut'] == op['rcut'], "Data violation: rcut of ov and op matrices are different"
if np.abs(ov['rcut'] - rcut) < 1e-10:
print(f"ORBGEN: jy_jy, mo_jy and mo_mo matrices loaded from {fov_} and {fop_}", flush = True)
minimizer.config_add(fov_, fop_, siab_settings.get('spill_coefs', [0.0, 1.0]))
minimizer.config_add(fov_, fop_, spill_coefs)
fov = fov_ if fov is None else fov

nbands_ref = [[orb['nbands_ref']] if not isinstance(orb['nbands_ref'], list) else orb['nbands_ref']\
for orb in siab_settings['orbitals']]
nbands_ref = [[nbands_ref[iorb]*len(folders[find]) for find in orb['folder']]
for iorb, orb in enumerate(siab_settings['orbitals'])]
nzeta = [orb['nzeta'] for orb in siab_settings['orbitals']]

nzeta = [orb['nzeta'] for orb in orbparams]

# prepare opt params
elem = flatten(folders)[0].split("-")[0]
initdir = "-".join([elem, "monomer"])
initdir = os.path.join(initdir, os.path.basename(fov))
guess = _initguess(nzeta, initdir, False, True)
ibands = _coef_opt_env_init(siab_settings['orbitals'], folders)
deps = [orb['nzeta_from'] for orb in orbparams]
guess = _make_guess(nzeta, initdir, False, True)
ibands = [_band_indexing(orb['nbands_ref'], orb['folder'], folders) for orb in orbparams]
ideps = [orb['nzeta_from'] for orb in orbparams]

return _do_onion_opt(minimizer,
nzeta,
iconfs,
ibands,
deps,
ideps,
nthreads,
options,
guess)

def _initguess(nzeta: list,
folder: str,
jy: bool = True,
diagnosis: bool = True):
def _make_guess(nzeta, folder, jy = True, diagnosis = True):
"""initialize the coef_guess for both jy and pw basis. calculate the maximal
number of zeta func needed by each angular momentum.
Expand Down Expand Up @@ -256,7 +256,7 @@ def _initguess(nzeta: list,
return dict(zip(keys, [nzeta_max, diagnosis, folder]))

def _do_onion_opt(minimizer, nzeta, iconfs, ibands, deps, nthreads, options, guess):
"""
"""Onion! optimize the contraction coefficients of jy from inner to outer step by step.
Based on the contraction coefficients of jy finding problem to the Spillage function
minimization problem, the optimization is performed in a hierarchical way: from
core, shell to outer shell. Once the optimization of one shell is done, during the
Expand All @@ -280,7 +280,7 @@ def _do_onion_opt(minimizer, nzeta, iconfs, ibands, deps, nthreads, options, gue
options: dict
the options for optimization
guess: dict
the initial guess configuration, see function _initguess for details
the initial guess configuration, see function _make_guess for details
Returns
-------
Expand Down Expand Up @@ -308,29 +308,40 @@ def _do_onion_opt(minimizer, nzeta, iconfs, ibands, deps, nthreads, options, gue
print(f"ORBGEN: End optimization on level {iorb + 1} orbital, merge with previous orbital shell(s).", flush = True)
return coefs

def _coef_opt(rcut: float, siab_settings: dict, folders: list, jy: bool = False):
def _coef_opt(rcut, orbparams, folders, maxiter, nthreads, jy, spill_coefs = None):
"""optimize Spillage function to get contraction coefficients of jy for one single rcut value
Parameters
----------
rcut: float
the cutoff radius
siab_settings: dict
the settings for SIAB optimization
orbparams: list
list of settings for generation of each orbital
folders: list[list[str]]
the folders where the ABACUS run information are stored
the folders where the ABACUS run information are stored, indexed by [geom][deform]
maxiter: int
the maximum number of iterations
nthreads: int
the number of threads used in optimization
jy: bool
whether the jY basis is used
spill_coefs: list[float]
the spillage coefficients, not used when set fit_basis as jy
Returns
-------
list[list[list[float]]]: the coefficients of the orbitals
"""
call = _coef_opt_jy if jy else _coef_opt_pw
option = {"maxiter": siab_settings.get("max_steps", 2000),
option = {"maxiter": maxiter,
"disp": True, "ftol": 0, "gtol": 1e-6, 'maxcor': 20}
nthreads = siab_settings.get("nthreads", 4)
return call(rcut, siab_settings, folders, option, nthreads)

return call(rcut,
orbparams,
folders,
option,
nthreads,
spill_coefs)

def _peel(coef, nzeta_lvl_tot):
"""peel the coefficients of the orbitals to different levels
Expand Down Expand Up @@ -569,25 +580,32 @@ def iter(siab_settings: dict, calculation_settings: list, folders: list):
run_map = {"none": "none", "restart": "restart", "bfgs": "opt"}
run_type = run_map.get(siab_settings.get("optimizer", "none"), "none")
for rcut in rcuts: # can be parallelized here

##############
# Generation #
##############
# for jy basis calculation, only matched rcut folders are needed
if run_type == "opt":
# REFACTOR: SIAB-v3.0, get folders with matched rcut
f_ = [[f for f in fgrp if len(f.split("-")) == 3 or \
float(f.split("-")[-1].replace("au", "")) == rcut] # jy case
for fgrp in folders]
jy = [f for f in folders if len(f) > 0][0][0][-2:] == "au"
coefs_tot = _coef_opt(rcut, siab_settings, f_, jy)
# optimize a cascade of orbitals is reasonable because the orbitals always
# have a hierarchical structure like
# SZ
# [SZ] SZP = DZP
# [DZP] SZP = TZDP
coefs_tot = _coef_opt(rcut,
siab_settings['orbitals'],
f_,
siab_settings.get("max_steps", 2000),
siab_settings.get("nthreads", 4),
jy,
siab_settings.get("spill_coefs", None))
elif run_type == "restart":
raise NotImplementedError("restart is not implemented yet")
else: # run_type == "none", used to generate jY basis
coefs_tot = [_coef_gen(rcut, ecut, len(orb['nzeta']) - 1) for orb in siab_settings['orbitals']]

# save orbitals
#################
# save orbitals #
#################
for ilev, coefs in enumerate(coefs_tot): # loop over different levels...
folder = "_".join([elem, f"{rcut}au", f"{ecut}Ry"]) # because the concept of "level" is not clear
for coefs_it in coefs: # loop over different atom types
Expand Down Expand Up @@ -754,6 +772,43 @@ def _nzeta_analysis(folder, count_thr = 1e-1, itype = 0):

class TestAPI(unittest.TestCase):

def test_coef_gen(self):

rcut = 3.0
ecut = 10.0
lmax = 3
coefs = _coef_gen(rcut, ecut, lmax)
self.assertEqual(len(coefs), 1) # always only one atomtype
self.assertEqual(len(coefs[0]), lmax + 1) # lmax + 1 orbitals
for l in range(lmax + 1):
dim1 = len(coefs[0][l])
dim2 = len(coefs[0][l][0])
self.assertEqual(dim1, dim2)
self.assertEqual(coefs[0][l], np.eye(dim1).tolist())

def test_band_indexing(self):

folders = [["folder1", "folder2"], ["folder3", "folder4"]]
# test single folder
self.assertEqual(_band_indexing(10, [0], folders), 10)
# test multiple folders
self.assertEqual(_band_indexing([10, 12], [0, 1], folders), [range(10), range(10), range(12), range(12)])

def test_make_guess(self):

nzeta = [[3, 3, 2], [2, 2, 1], [1, 1]]
initdir = "initdir"
guess = _make_guess(nzeta, initdir, True, True)
self.assertEqual(guess["nzeta"], [3, 3, 2])
self.assertEqual(guess["diagnosis"], True)
self.assertEqual(guess["outdir"], "initdir")

nzeta = [[3, 1, 2], [2, 2, 1], [4, 1]]
guess = _make_guess(nzeta, initdir, False, True)
self.assertEqual(guess["nzeta"], [4, 2, 2])
self.assertEqual(guess["diagnosis"], True)
self.assertEqual(guess["orb_mat"], "initdir")

def test_orb_matrices(self):

test_folder = "test_orb_matrices"
Expand Down

0 comments on commit e08ab10

Please sign in to comment.