forked from thu-ml/zhusuan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gaussian.py
93 lines (80 loc) · 3.08 KB
/
gaussian.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import tensorflow as tf
import zhusuan as zs
@zs.meta_bayesian_net()
def gaussian(n_x, stdev, n_particles):
bn = zs.BayesianNet()
bn.normal('x', tf.zeros([n_x]), std=stdev, n_samples=n_particles,
group_ndims=1)
return bn
if __name__ == "__main__":
tf.set_random_seed(1)
# Define model parameters
n_x = 1
# n_x = 10
stdev = 1 / (np.arange(n_x, dtype=np.float32) + 1)
# Define HMC parameters
kernel_width = 0.1
n_chains = 1000
n_iters = 200
burnin = n_iters // 2
n_leapfrogs = 5
# Build the computation graph
model = gaussian(n_x, stdev, n_chains)
adapt_step_size = tf.placeholder(tf.bool, shape=[], name="adapt_step_size")
adapt_mass = tf.placeholder(tf.bool, shape=[], name="adapt_mass")
hmc = zs.HMC(step_size=1e-3, n_leapfrogs=n_leapfrogs,
adapt_step_size=adapt_step_size, adapt_mass=adapt_mass,
target_acceptance_rate=0.9)
x = tf.Variable(tf.zeros([n_chains, n_x]), trainable=False, name='x')
sample_op, hmc_info = hmc.sample(model, {}, {'x': x})
# Run the inference
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
samples = []
print('Sampling...')
for i in range(n_iters):
_, x_sample, acc, ss = sess.run(
[sample_op, hmc_info.samples['x'], hmc_info.acceptance_rate,
hmc_info.updated_step_size],
feed_dict={adapt_step_size: i < burnin // 2,
adapt_mass: i < burnin // 2})
print('Sample {}: Acceptance rate = {}, updated step size = {}'
.format(i, np.mean(acc), ss))
if i >= burnin:
samples.append(x_sample)
print('Finished.')
samples = np.vstack(samples)
# Check & plot the results
print('Expected mean = {}'.format(np.zeros(n_x)))
print('Sample mean = {}'.format(np.mean(samples, 0)))
print('Expected stdev = {}'.format(stdev))
print('Sample stdev = {}'.format(np.std(samples, 0)))
print('Relative error of stdev = {}'.format(
(np.std(samples, 0) - stdev) / stdev))
def kde(xs, mu, batch_size):
mu_n = len(mu)
assert mu_n % batch_size == 0
xs_row = np.expand_dims(xs, 1)
ys = np.zeros(xs.shape)
for b in range(mu_n // batch_size):
mu_col = np.expand_dims(mu[b * batch_size:(b + 1) * batch_size], 0)
ys += (1 / np.sqrt(2 * np.pi) / kernel_width) * \
np.mean(np.exp((-0.5 / kernel_width ** 2) *
np.square(xs_row - mu_col)), 1)
ys /= (mu_n / batch_size)
return ys
if n_x == 1:
xs = np.linspace(-5, 5, 1000)
ys = kde(xs, np.squeeze(samples), n_chains)
f, ax = plt.subplots()
ax.plot(xs, ys)
ax.plot(xs, stats.norm.pdf(xs, scale=stdev[0]))
plt.show()