-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
169 lines (148 loc) · 7.12 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import itertools
import numpy as np
from numpy.lib.recfunctions import structured_to_unstructured
from scipy.integrate import simpson
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
def generateWindows(a, bins):
aPadded = np.pad(a, bins, mode="wrap")
aWindows = np.empty((len(a), 2*bins+1))
for i in range(len(a)):
aWindows[i] = aPadded[i:i+2*bins+1]
return aWindows
def c1(model, rho, dx=0.01, c2=False):
"""
Infer the one-body direct correlation profile from a given density profile with a given neural correlation functional.
model: The neural correlation functional
rho: The density profile
dx: The discretization of the input layer of the model
c2: If False, only return c1(x). If True, return both c1 as well as the corresponding two-body direct correlation function c2(x, x') which is obtained via autodifferentiation. If 'unstacked', give c2 as a function of x and x-x', i.e. as obtained naturally from the model.
"""
inputBins = model.input.shape[1]
windowBins = (inputBins - 1) // 2
rhoWindows = generateWindows(rho, windowBins).reshape(rho.shape[0], inputBins, 1)
if c2:
rhoWindows = tf.Variable(rhoWindows)
with tf.GradientTape(persistent=True, watch_accessed_variables=False) as tape:
tape.watch(rhoWindows)
result = model(rhoWindows)
jacobiWindows = tape.batch_jacobian(result, rhoWindows).numpy().squeeze() / dx
c1_result = result.numpy().flatten()
if c2 == "unstacked":
return c1_result, jacobiWindows
c2_result = np.row_stack([np.roll(np.pad(jacobiWindows[i], (0,rho.shape[0]-inputBins)), i-windowBins) for i in range(rho.shape[0])])
return c1_result, c2_result
return model.predict_on_batch(rhoWindows).flatten()
def Fexc(model, rho, dx=0.01):
"""
Calculate the excess free energy Fexc for a given density profile with functional line integration.
model: The neural correlation functional
rho: The density profile
dx: The discretization of the input layer of the model
"""
alphas = np.linspace(0, 1, 30)
integrands = np.empty_like(alphas)
for i, alpha in enumerate(alphas):
integrands[i] = np.sum(rho * c1(model, alpha * rho)) * dx
Fexc = -simpson(integrands, alphas)
return Fexc
class DataGenerator(keras.utils.Sequence):
def __init__(self, simData, batch_size=32, shuffle=True, inputKeys=["rho"], outputKeys=["c1"], windowSigma=2.0):
self.simData = simData
self.inputKeys = inputKeys
self.outputKeys = outputKeys
self.windowSigma = windowSigma
firstSimData = list(self.simData.values())[0]
self.dz = 2 * firstSimData["z"][0]
self.simDataBins = len(firstSimData["z"])
self.windowBins = int(round(self.windowSigma/self.dz))
self.inputData = {}
self.outputData = {}
self.validBins = {}
for simId in self.simData.keys():
valid = np.full(self.simDataBins, True)
for k in self.outputKeys:
valid = np.logical_and(valid, ~np.isnan(self.simData[simId][k]))
self.validBins[simId] = np.flatnonzero(valid)
self.inputData[simId] = structured_to_unstructured(np.pad(self.simData[simId][self.inputKeys], self.windowBins, mode="wrap"))
self.outputData[simId] = structured_to_unstructured(np.pad(self.simData[simId][self.outputKeys], self.windowBins, mode="wrap"))
self.batch_size = batch_size
self.inputShape = (2*self.windowBins+1, len(self.inputKeys))
self.outputShape = (len(self.outputKeys),)
self.shuffle = shuffle
self.on_epoch_end()
print(f"Initialized DataGenerator from {len(self.simData)} simulations which will yield up to {len(self.indices)} input/output samples in batches of {self.batch_size}")
def __len__(self):
return int(np.floor(len(self.indices) / self.batch_size))
def __getitem__(self, index):
ids = self.indices[index*self.batch_size:(index+1)*self.batch_size]
X = np.empty((self.batch_size, *self.inputShape))
y = np.empty((self.batch_size, *self.outputShape))
for b, (simId, i) in enumerate(ids):
i += self.windowBins
X[b] = self.inputData[simId][i-self.windowBins:i+self.windowBins+1]
y[b] = self.outputData[simId][i]
return X, y
def on_epoch_end(self):
self.indices = []
for simId in self.simData.keys():
self.indices.extend(list(itertools.product([simId], list(self.validBins[simId]))))
if self.shuffle == True:
np.random.default_rng().shuffle(self.indices)
class BulkCorrelationEvaluator:
"""
This class provides utilities to calculate c2 and c3 of the bulk fluid.
"""
def __init__(self, model, dx, rhob=0.7):
self.model = model
self.dx = dx
self.rhob = rhob
self.inputBins = self.model.input.shape[1]
self.channels = self.model.input.shape[2]
rhobWindow = np.full(self.inputBins, self.rhob)
inputBinsHalf = self.inputBins // 2
self.xWindow = self.dx * np.linspace(-inputBinsHalf, inputBinsHalf, self.inputBins)
if self.channels == 1:
inputWindow = rhobWindow.reshape(1, self.inputBins, 1)
elif self.channels == 2:
inputWindow = np.c_[self.xWindow, rhobWindow].reshape(1, self.inputBins, 2)
self.inputWindow = tf.Variable(inputWindow)
def c2(self):
with tf.GradientTape(persistent=True, watch_accessed_variables=False) as tape:
tape.watch(self.inputWindow)
result = self.model(self.inputWindow)
grad = tape.gradient(result, self.inputWindow)
return tf.squeeze(grad).numpy() / self.dx
def c3(self):
with tf.GradientTape(persistent=True, watch_accessed_variables=False) as tape1:
tape1.watch(self.inputWindow)
with tf.GradientTape(persistent=True, watch_accessed_variables=False) as tape2:
tape2.watch(self.inputWindow)
result = self.model(self.inputWindow)
c2 = tape2.gradient(result, self.inputWindow)
c3 = tape1.jacobian(c2, self.inputWindow)
return tf.squeeze(c3).numpy() / self.dx**2
class PlotBulkCorrelationCallback(keras.callbacks.Callback):
"""
This class implements a callback which can be used during training to monitor the quality of bulk correlations.
"""
def __init__(self, when="on_batch_end", every=1, options=None):
super().__init__()
setattr(self, when, self._doPlot)
self.every = every
self.options = options
plt.ion()
self.fig, self.ax = plt.subplots()
self.c2Graph, = self.ax.plot(0)
def _doPlot(self, index, logs=None):
if index % self.every != 0:
return
c2 = self.testProfilesEvaluator.c2()
self.c2Graph.set_data(self.testProfilesEvaluator.xWindow, c2)
self.ax.relim()
self.ax.autoscale_view()
self.fig.canvas.draw()
self.fig.canvas.flush_events()
def on_epoch_begin(self, index, logs=None):
self.testProfilesEvaluator = BulkCorrelationEvaluator(self.model, **self.options)