forked from ColCarroll/minimc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathminimc.py
158 lines (137 loc) · 5.23 KB
/
minimc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Implementations of samplers for casual use.
import numpy as np
import scipy.stats as st
from tqdm import tqdm
from .integrators import leapfrog
__all__ = ["hamiltonian_monte_carlo"]
def hamiltonian_monte_carlo(
n_samples,
potential,
initial_position,
initial_potential=None,
initial_potential_grad=None,
tune=500,
path_len=1,
initial_step_size=0.1,
integrator=leapfrog,
max_energy_change=1000.0,
):
"""Run Hamiltonian Monte Carlo sampling.
Parameters
----------
n_samples : int
Number of samples to return
negative_log_prob : callable
The negative log probability to sample from
initial_position : np.array
A place to start sampling from.
tune: int
Number of iterations to run tuning
path_len : float
How long each integration path is. Smaller is faster and more correlated.
initial_step_size : float
How long each integration step is. This will be tuned automatically.
max_energy_change : float
The largest tolerable integration error. Transitions with energy changes
larger than this will be declared divergences.
Returns
-------
np.array
Array of length `n_samples`.
"""
initial_position = np.array(initial_position)
if initial_potential is None or initial_potential_grad is None:
initial_potential, initial_potential_grad = potential(initial_position)
# collect all our samples in a list
samples = [initial_position]
# Keep a single object for momentum resampling
momentum = st.norm(0, 1)
step_size = initial_step_size
step_size_tuning = DualAveragingStepSize(step_size)
# If initial_position is a 10d vector and n_samples is 100, we want 100 x 10 momentum draws
# we can do this in one call to np.random.normal, and iterate over rows
size = (n_samples + tune,) + initial_position.shape[:1]
for idx, p0 in tqdm(enumerate(momentum.rvs(size=size)), total=size[0]):
# Integrate over our path to get a new position and momentum
q_new, p_new, final_V, final_dVdq = integrator(
samples[-1],
p0,
initial_potential_grad,
potential,
path_len=2
* np.random.rand()
* path_len, # We jitter the path length a bit
step_size=step_size,
)
start_log_p = np.sum(momentum.logpdf(p0)) - initial_potential
new_log_p = np.sum(momentum.logpdf(p_new)) - final_V
energy_change = new_log_p - start_log_p
# Check Metropolis acceptance criterion
p_accept = min(1, np.exp(energy_change))
if np.random.rand() < p_accept:
samples.append(q_new)
initial_potential = final_V
initial_potential_grad = final_dVdq
else:
samples.append(np.copy(samples[-1]))
if idx < tune - 1:
step_size, _ = step_size_tuning.update(p_accept)
elif idx == tune - 1:
_, step_size = step_size_tuning.update(p_accept)
return np.array(samples[1 + tune :])
class DualAveragingStepSize:
def __init__(
self,
initial_step_size,
target_accept=0.8,
gamma=0.05,
t0=10.0,
kappa=0.75,
):
"""Tune the step size to achieve a desired target acceptance.
Uses stochastic approximation of Robbins and Monro (1951), described in
Hoffman and Gelman (2013), section 3.2.1, and using those default values.
Parameters
----------
initial_step_size: float > 0
Used to set a reasonable value for the stochastic step to drift towards
target_accept: float in (0, 1)
Will try to find a step size that accepts this percent of proposals
gamma: float
How quickly the stochastic step size reverts to a value mu
t0: float > 0
Larger values stabilize step size exploration early, while perhaps slowing
convergence
kappa: float in (0.5, 1]
The smaller kappa is, the faster we forget earlier step size iterates
"""
self.mu = np.log(10 * initial_step_size)
self.target_accept = target_accept
self.gamma = gamma
self.t = t0
self.kappa = kappa
self.error_sum = 0
self.log_averaged_step = 0
def update(self, p_accept):
"""Propose a new step size.
This method returns both a stochastic step size and a dual-averaged
step size. While tuning, the HMC algorithm should use the stochastic
step size and call `update` every loop. After tuning, HMC should use
the dual-averaged step size for sampling.
Parameters
----------
p_accept: float
The probability of the previous HMC proposal being accepted
Returns
-------
float, float
A stochastic step size, and a dual-averaged step size
"""
self.error_sum += self.target_accept - p_accept
log_step = self.mu - self.error_sum / (np.sqrt(self.t) * self.gamma)
eta = self.t ** -self.kappa
self.log_averaged_step = (
eta * log_step + (1 - eta) * self.log_averaged_step
)
self.t += 1
return np.exp(log_step), np.exp(self.log_averaged_step)