Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: add unittest for spillage/api.py #96

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 119 additions & 64 deletions SIAB/spillage/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _coef_gen(rcut: float, ecut: float, lmax: int, value: str = "eye"):

Returns
-------
list: the value requested
list: the coefficients of the orbitals, indexed by [type][l][zeta][q]
"""

if not isinstance(ecut, (int, float)):
Expand Down Expand Up @@ -63,41 +63,46 @@ def _coef_restart(fcoef: str):
"""
return read_param(fcoef)

def _coef_opt_env_init(orbparam: list, folders: list):
"""the length of this function will continuously increase once there are new
feature requests
def _band_indexing(nbands, indexes, folders):
"""calculate the band indexes for orbital optimization.

Parameters
----------
orbparam: list
params set for each orbital
folders: list
the folders where the ABACUS run information are stored

nbands: int|list[int]
the number of bands for each geometry
indexes: list[int]
the indexes of the geometries
folders: list[list[str]]
the folders where the ABACUS run information are stored. The first level of list
is the geometry like dimer, trimer, etc, the second level is the folders of the
deformation of the geometry, such as stretching, bending, etc.

Returns
-------
list[list[list[int]]]: the band indexes for each shell of orbitals
list: the band indexes for each geometry

Notes
-----
This function now handles two circumstances:
1. nbands is a list.
this means there are multiple geometries were set as reference structures,
and the elem of list specifies the number of bands for each geometry.
2. nbands is an scalar.
"""
# Request: taking different geometry configurations into account
ibands = [[] for _ in range(len(orbparam))]
for iorb, orb in enumerate(orbparam):
if isinstance(orb['nbands_ref'], list):
ibands[iorb] = flatten([[range(n)]*len(folders[find]) \
for find, n in zip(orb['folder'], orb['nbands_ref'])])
else:
ibands[iorb] = orb['nbands_ref']

return ibands

def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
if isinstance(nbands, list):
return flatten([[range(n)]*len(folders[i]) for i, n in zip(indexes, nbands)])
else:
return nbands
kirk0830 marked this conversation as resolved.
Show resolved Hide resolved

def _coef_opt_jy(rcut, orbparams, folders, options, nthreads, spill_coefs = None):
"""for fit_basis jy case, optimize Spillage function to get contraction coefficients of jy for one single rcut value

Parameters
----------
rcut: float
the cutoff radius
siab_settings: dict
the settings for SIAB optimization
orbparams: list
list of settings for generation of each orbital
folders: list[list[str]]
the folders where the ABACUS run information are stored. The first level of list
is the geometry like dimer, trimer, etc, the second level is the folders of the
Expand All @@ -113,8 +118,8 @@ def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
"""
minimizer = Spillage_jy()
print(f"ORBGEN: Optimizing orbitals for rcut = {rcut} au", flush = True)

orbparams = siab_settings['orbitals']
if spill_coefs:
print("ORBGEN: For jy, spill_coefs is deprecated.", flush = True)

configs = [folders[indf] for orb in orbparams for indf in orb['folder']]
configs = list(set([folder for f in configs for folder in f]))
Expand All @@ -126,11 +131,13 @@ def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
for folder in configs:
minimizer.config_add(os.path.join(folder, f"OUT.{os.path.basename(folder)}"))


# infer nzeta if `zeta_notation` specified as `auto`, this cause the `nzeta` to be `auto`
nbands_ref = [[orb['nbands_ref']] if not isinstance(orb['nbands_ref'], list) else orb['nbands_ref']\
for orb in orbparams]
nbands_ref = [[nbands_ref[iorb]*len(folders[find]) for find in orb['folder']]
nbands_ref = [[nbands_ref[iorb]*len(folders[i]) for i in orb['folder']]
for iorb, orb in enumerate(orbparams)]

nzeta = [orb['nzeta'] if orb['nzeta'] != "auto" else\
_nzeta_mean_conf(flatten(nbands_ref[iorb]), flatten([folders[i] for i in orb['folder']]))\
for iorb, orb in enumerate(orbparams)]
Expand All @@ -141,9 +148,9 @@ def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
elem = flatten(folders)[0].split("-")[0]
initdir = "-".join([elem, "monomer", f"{rcut}au"])
initdir = os.path.join(initdir, f"OUT.{os.path.basename(initdir)}")
guess = _initguess(nzeta, initdir, True, True)
guess = _make_guess(nzeta, initdir, True, True)

ibands = _coef_opt_env_init(siab_settings['orbitals'], folders)
ibands = [_band_indexing(orb['nbands_ref'], orb['folder'], folders) for orb in orbparams]
deps = [orb['nzeta_from'] for orb in orbparams]

return _do_onion_opt(minimizer,
Expand All @@ -155,15 +162,15 @@ def _coef_opt_jy(rcut, siab_settings, folders, options, nthreads):
options,
guess)

def _coef_opt_pw(rcut, siab_settings, folders, options, nthreads):
def _coef_opt_pw(rcut, orbparams, folders, options, nthreads, spill_coefs = None):
"""for fit_basis pw case, optimize Spillage function to get contraction coefficients of pw for one single rcut value

Parameters
----------
rcut: float
the cutoff radius
siab_settings: dict
the settings for SIAB optimization
orbparams: list
list of settings for generation of each orbital
folders: list[list[str]]
the folders where the ABACUS run information are stored. The first level of list
is the geometry like dimer, trimer, etc, the second level is the folders of the
Expand All @@ -181,7 +188,7 @@ def _coef_opt_pw(rcut, siab_settings, folders, options, nthreads):
minimizer = Spillage_pw()
print(f"ORBGEN: Optimizing orbitals for rcut = {rcut} au", flush = True)

orbparams = siab_settings['orbitals']
spill_coefs = [0.0, 1.0] if spill_coefs is None else spill_coefs

configs = [folders[indf] for orb in orbparams for indf in orb['folder']]
configs = list(set([folder for f in configs for folder in f]))
Expand All @@ -197,36 +204,29 @@ def _coef_opt_pw(rcut, siab_settings, folders, options, nthreads):
assert ov['rcut'] == op['rcut'], "Data violation: rcut of ov and op matrices are different"
if np.abs(ov['rcut'] - rcut) < 1e-10:
print(f"ORBGEN: jy_jy, mo_jy and mo_mo matrices loaded from {fov_} and {fop_}", flush = True)
minimizer.config_add(fov_, fop_, siab_settings.get('spill_coefs', [0.0, 1.0]))
minimizer.config_add(fov_, fop_, spill_coefs)
fov = fov_ if fov is None else fov

nbands_ref = [[orb['nbands_ref']] if not isinstance(orb['nbands_ref'], list) else orb['nbands_ref']\
for orb in siab_settings['orbitals']]
nbands_ref = [[nbands_ref[iorb]*len(folders[find]) for find in orb['folder']]
for iorb, orb in enumerate(siab_settings['orbitals'])]
nzeta = [orb['nzeta'] for orb in siab_settings['orbitals']]

nzeta = [orb['nzeta'] for orb in orbparams]

# prepare opt params
elem = flatten(folders)[0].split("-")[0]
initdir = "-".join([elem, "monomer"])
initdir = os.path.join(initdir, os.path.basename(fov))
guess = _initguess(nzeta, initdir, False, True)
ibands = _coef_opt_env_init(siab_settings['orbitals'], folders)
deps = [orb['nzeta_from'] for orb in orbparams]
guess = _make_guess(nzeta, initdir, False, True)
ibands = [_band_indexing(orb['nbands_ref'], orb['folder'], folders) for orb in orbparams]
ideps = [orb['nzeta_from'] for orb in orbparams]

return _do_onion_opt(minimizer,
nzeta,
iconfs,
ibands,
deps,
ideps,
nthreads,
options,
guess)

def _initguess(nzeta: list,
folder: str,
jy: bool = True,
diagnosis: bool = True):
def _make_guess(nzeta, folder, jy = True, diagnosis = True):
"""initialize the coef_guess for both jy and pw basis. calculate the maximal
number of zeta func needed by each angular momentum.

Expand Down Expand Up @@ -256,7 +256,7 @@ def _initguess(nzeta: list,
return dict(zip(keys, [nzeta_max, diagnosis, folder]))

def _do_onion_opt(minimizer, nzeta, iconfs, ibands, deps, nthreads, options, guess):
"""
"""Onion! optimize the contraction coefficients of jy from inner to outer step by step.
Based on the contraction coefficients of jy finding problem to the Spillage function
minimization problem, the optimization is performed in a hierarchical way: from
core, shell to outer shell. Once the optimization of one shell is done, during the
Expand All @@ -280,7 +280,7 @@ def _do_onion_opt(minimizer, nzeta, iconfs, ibands, deps, nthreads, options, gue
options: dict
the options for optimization
guess: dict
the initial guess configuration, see function _initguess for details
the initial guess configuration, see function _make_guess for details

Returns
-------
Expand Down Expand Up @@ -308,29 +308,40 @@ def _do_onion_opt(minimizer, nzeta, iconfs, ibands, deps, nthreads, options, gue
print(f"ORBGEN: End optimization on level {iorb + 1} orbital, merge with previous orbital shell(s).", flush = True)
return coefs

def _coef_opt(rcut: float, siab_settings: dict, folders: list, jy: bool = False):
def _coef_opt(rcut, orbparams, folders, maxiter, nthreads, jy, spill_coefs = None):
"""optimize Spillage function to get contraction coefficients of jy for one single rcut value

Parameters
----------
rcut: float
the cutoff radius
siab_settings: dict
the settings for SIAB optimization
orbparams: list
list of settings for generation of each orbital
folders: list[list[str]]
the folders where the ABACUS run information are stored
the folders where the ABACUS run information are stored, indexed by [geom][deform]
maxiter: int
the maximum number of iterations
nthreads: int
the number of threads used in optimization
jy: bool
whether the jY basis is used
spill_coefs: list[float]
the spillage coefficients, not used when set fit_basis as jy

Returns
-------
list[list[list[float]]]: the coefficients of the orbitals
"""
call = _coef_opt_jy if jy else _coef_opt_pw
option = {"maxiter": siab_settings.get("max_steps", 2000),
option = {"maxiter": maxiter,
"disp": True, "ftol": 0, "gtol": 1e-6, 'maxcor': 20}
nthreads = siab_settings.get("nthreads", 4)
return call(rcut, siab_settings, folders, option, nthreads)

return call(rcut,
orbparams,
folders,
option,
nthreads,
spill_coefs)

def _peel(coef, nzeta_lvl_tot):
"""peel the coefficients of the orbitals to different levels
Expand Down Expand Up @@ -569,25 +580,32 @@ def iter(siab_settings: dict, calculation_settings: list, folders: list):
run_map = {"none": "none", "restart": "restart", "bfgs": "opt"}
run_type = run_map.get(siab_settings.get("optimizer", "none"), "none")
for rcut in rcuts: # can be parallelized here

##############
# Generation #
##############
# for jy basis calculation, only matched rcut folders are needed
if run_type == "opt":
# REFACTOR: SIAB-v3.0, get folders with matched rcut
f_ = [[f for f in fgrp if len(f.split("-")) == 3 or \
float(f.split("-")[-1].replace("au", "")) == rcut] # jy case
for fgrp in folders]
jy = [f for f in folders if len(f) > 0][0][0][-2:] == "au"
coefs_tot = _coef_opt(rcut, siab_settings, f_, jy)
# optimize a cascade of orbitals is reasonable because the orbitals always
# have a hierarchical structure like
# SZ
# [SZ] SZP = DZP
# [DZP] SZP = TZDP
coefs_tot = _coef_opt(rcut,
kirk0830 marked this conversation as resolved.
Show resolved Hide resolved
siab_settings['orbitals'],
f_,
siab_settings.get("max_steps", 2000),
siab_settings.get("nthreads", 4),
jy,
siab_settings.get("spill_coefs", None))
elif run_type == "restart":
raise NotImplementedError("restart is not implemented yet")
else: # run_type == "none", used to generate jY basis
coefs_tot = [_coef_gen(rcut, ecut, len(orb['nzeta']) - 1) for orb in siab_settings['orbitals']]

# save orbitals
#################
# save orbitals #
#################
for ilev, coefs in enumerate(coefs_tot): # loop over different levels...
folder = "_".join([elem, f"{rcut}au", f"{ecut}Ry"]) # because the concept of "level" is not clear
for coefs_it in coefs: # loop over different atom types
Expand Down Expand Up @@ -754,6 +772,43 @@ def _nzeta_analysis(folder, count_thr = 1e-1, itype = 0):

class TestAPI(unittest.TestCase):

def test_coef_gen(self):

rcut = 3.0
ecut = 10.0
lmax = 3
coefs = _coef_gen(rcut, ecut, lmax)
self.assertEqual(len(coefs), 1) # always only one atomtype
self.assertEqual(len(coefs[0]), lmax + 1) # lmax + 1 orbitals
for l in range(lmax + 1):
kirk0830 marked this conversation as resolved.
Show resolved Hide resolved
dim1 = len(coefs[0][l])
dim2 = len(coefs[0][l][0])
self.assertEqual(dim1, dim2)
self.assertEqual(coefs[0][l], np.eye(dim1).tolist())

def test_band_indexing(self):

folders = [["folder1", "folder2"], ["folder3", "folder4"]]
# test single folder
self.assertEqual(_band_indexing(10, [0], folders), 10)
# test multiple folders
self.assertEqual(_band_indexing([10, 12], [0, 1], folders), [range(10), range(10), range(12), range(12)])

def test_make_guess(self):

nzeta = [[3, 3, 2], [2, 2, 1], [1, 1]]
initdir = "initdir"
guess = _make_guess(nzeta, initdir, True, True)
self.assertEqual(guess["nzeta"], [3, 3, 2])
self.assertEqual(guess["diagnosis"], True)
self.assertEqual(guess["outdir"], "initdir")

nzeta = [[3, 1, 2], [2, 2, 1], [4, 1]]
guess = _make_guess(nzeta, initdir, False, True)
self.assertEqual(guess["nzeta"], [4, 2, 2])
self.assertEqual(guess["diagnosis"], True)
self.assertEqual(guess["orb_mat"], "initdir")

def test_orb_matrices(self):

test_folder = "test_orb_matrices"
Expand Down