# -*- coding: utf-8 -*-
"""Routines to help do Green-Kubo and Helfand moments analysis."""
import logging
import pprint
import warnings
import numpy as np
from scipy.integrate import cumulative_trapezoid
from scipy.optimize import curve_fit, OptimizeWarning
import statsmodels.tsa.stattools as stattools
logger = logging.getLogger("thermal_conductivity")
tensor_labels = [
("xx", "red", "rgba(255,0,0,0.1)"),
("yy", "green", "rgba(0,255,0,0.1)"),
("zz", "blue", "rgba(0,0,255,0.1)"),
("xy", "rgb(127,127,0)", "rgba(127,127,0,0.1)"),
("xz", "rgb(255,0,255)", "rgba(255,0,255,0.1)"),
("yz", "rgb(0,255,255)", "rgba(0,255,255,0.1)"),
]
a1 = 0
tau1 = 0
[docs]
def moving_average(a, n):
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1 :] / n
[docs]
def frequencies(data):
ps = np.abs(np.fft.rfft(data)) ** 2
# time_step = 1 / 100
freqs = np.fft.rfftfreq(data.size)
idx = np.argsort(freqs)
for i in idx[0:500]:
print(f"{freqs[i]:.4f} {ps[i]:12.0f}")
[docs]
def exp_1(t, a, tau):
return a * (1 - np.exp(-t / tau))
[docs]
def exp_1a(t, a2, tau2):
global a1, tau1
return a1 * (1 - np.exp(-t / tau1)) + a2 * (1 - np.exp(-t / tau2))
[docs]
def exp_2(t, a1, tau1, a2, tau2):
return a1 * (1 - np.exp(-t / tau1)) + a2 * (1 - np.exp(-t / tau2))
[docs]
def axb(x, a, b):
return a * x + b
[docs]
def fit_h(x, a, b, tau):
return a * (tau * np.exp(-x / tau) + x) + b
[docs]
def acf_err(acf):
"""The standard error of the autocorrelation function.
Copied from statsmodels.tsa.stattools.acf
"""
nobs = acf.shape[0]
varacf = np.ones_like(acf) / nobs
varacf[0] = 0
varacf[1] = 1.0 / nobs
varacf[2:] *= 1 + 2 * np.cumsum(acf[1:-1] ** 2)
std = np.sqrt(varacf)
return std
[docs]
def ccf_err(acf1, acf2):
"""The standard error of crosscorrelation functions.
Copied from statsmodels.tsa.stattools.acf
"""
nobs = acf1.shape[0]
varccf = np.ones_like(acf1) / nobs
varccf[0] = 0
varccf[1] = 1.0 / nobs
varccf[2:] *= 1 + 2 * np.cumsum(acf1[1:-1] * acf2[1:-1])
std = np.sqrt(varccf)
return std
[docs]
def create_correlation_functions(J, m=None):
"""Create the correlation functions of the heat fluxes.
Parameters
----------
J : numpy.ndarray(3, n)
The heat fluxes in x, y, and z
Returns
-------
numpy.ndarray(6, n)
The correlation functions
"""
if m is None:
m = J.shape[1]
ccf = np.zeros((6, m))
integral = np.zeros((6, m))
ccf[0] = stattools.ccovf(J[0], J[0])[:m]
ccf[1] = stattools.ccovf(J[1], J[1])[:m]
ccf[2] = stattools.ccovf(J[2], J[2])[:m]
ccf[3] = stattools.ccovf(J[0], J[1])[:m]
ccf[4] = stattools.ccovf(J[0], J[2])[:m]
ccf[5] = stattools.ccovf(J[1], J[2])[:m]
integral[0] = cumulative_trapezoid(ccf[0], initial=0.0)
integral[1] = cumulative_trapezoid(ccf[1], initial=0.0)
integral[2] = cumulative_trapezoid(ccf[2], initial=0.0)
integral[3] = cumulative_trapezoid(ccf[3], initial=0.0)
integral[4] = cumulative_trapezoid(ccf[4], initial=0.0)
integral[5] = cumulative_trapezoid(ccf[5], initial=0.0)
return ccf, integral
[docs]
def create_helfand_moments(J, m=None):
"""Create the Helfand moments from heat fluxes.
Parameters
----------
J : numpy.ndarray(3, n)
The heat fluxes in x, y, and z
m : int
The length of the Helfand moments wanted
Returns
-------
numpy.ndarray(6, m)
The Helfand moments
"""
n = J.shape[1]
if m is None:
m = min(n // 20, 10000)
M = np.zeros((6, m))
for i in range(n - m):
Ix = cumulative_trapezoid(J[0, i : m + i], initial=0.0)
Iy = cumulative_trapezoid(J[1, i : m + i], initial=0.0)
Iz = cumulative_trapezoid(J[2, i : m + i], initial=0.0)
M[0, :] += Ix * Ix
M[1, :] += Iy * Iy
M[2, :] += Iz * Iz
M[3, :] += Ix * Iy
M[4, :] += Ix * Iz
M[5, :] += Iy * Iz
M /= n - m
return M
[docs]
def fit_green_kubo_integral(y, xs, sigma=None):
"""Find the best a * (1 - exp(-tau/t)) form.
Parameters
----------
y : [float] or numpy.ndarray()
The integral of the correlation functions
xs : [float]
The time (x) coordinate
sigma : [float] or numpy.ndarray()
Optional standard error of y
Returns
-------
a : [float]
The list of coefficients
tau : [float]
The time constants
a_err : [float]
The standard error of the coefficients.
tau_err : [float]
The standard error of the time constants
n : int
The point in the x (time) vector that is 3*tau
"""
# frequencies(y)
dx = xs[1] - xs[0]
width = int(0.4 / dx)
ma = moving_average(y, width)
# Find the initial, steep rise. Ignore first couple points.
for i in range(2, len(ma) // 2):
if ma[i] > 0.9 * ma[i + width]:
break
npts = i + width
tau_guess = xs[npts]
a_guess = ma[i]
npts *= 20
if 2 * npts > y.size:
npts = y.size // 2
sigma0 = sigma + np.average(sigma[0 : y.size // 10])
# First fit with single exponential
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
warnings.simplefilter("ignore", OptimizeWarning)
popt1, pcov1, infodict1, msg1, ierr1 = curve_fit(
exp_1,
xs[1:npts],
y[1:npts],
full_output=True,
sigma=sigma0[1:npts],
absolute_sigma=True,
p0=[a_guess, tau_guess],
)
err = np.sqrt(np.diag(pcov1)).tolist()
a1 = popt1[0]
tau1 = popt1[1]
# find a range for the next fit to keep it a "good" part of the data
i_min = int(tau1 / dx / 2) # tau / 2
i_max = int(tau1 / dx * 20) # 20 * tau1
# And add a second exponential
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
warnings.simplefilter("ignore", OptimizeWarning)
popt, pcov, infodict, msg, ierr = curve_fit(
exp_2,
xs[i_min:i_max],
y[i_min:i_max],
full_output=True,
sigma=sigma[i_min:i_max],
absolute_sigma=True,
p0=[a1, tau1, 0.1 * a1, 5 * tau1],
)
err = np.sqrt(np.diag(pcov)).tolist()
a = [popt[0], popt[2]]
a_err = [err[0], err[2]]
tau = [popt[1], popt[3]]
tau_err = [err[1], err[3]]
kappa = a[0] + a[1]
kappa_err = a_err[0] + a_err[1]
if kappa < 0 or abs(kappa) > 2 * abs(a1): # Shouldn't change this much!
err = np.sqrt(np.diag(pcov1)).tolist()
a = [popt1[0]]
a_err = [err[0]]
tau = [popt1[1]]
tau_err = [err[1]]
kappa = a[0]
kappa_err = a_err[0]
except Exception:
err = np.sqrt(np.diag(pcov1)).tolist()
a = [popt1[0]]
a_err = [err[0]]
tau = [popt1[1]]
tau_err = [err[1]]
kappa = a[0]
kappa_err = a_err[0]
# Find point for 3 * tau
tmp = max(tau)
nf = int(tmp / dx) * 20
if nf > len(xs):
nf = len(xs)
fy = []
if len(a) == 1:
for t in xs[1:nf]:
fy.append(exp_1(t, a[0], tau[0]))
else:
for t in xs[1:nf]:
fy.append(exp_2(t, a[0], tau[0], a[1], tau[1]))
return kappa, kappa_err, a, a_err, tau, tau_err, xs[1:nf], fy
[docs]
def fit_helfand_moment(y, xs, sigma=None, start=1, logger=logger):
"""Find the best linear fit to longest possible segment.
Parameters
----------
y : [float] or numpy.ndarray()
The Helfand moment
xs : [float]
The time (x) coordinate
sigma : [float] or numpy.ndarray()
Optional standard error of y
Returns
-------
slope : float
The fit slope.
stderr : float
The 95% standard error of the slope
xs : [float]
The x values (time) for the fit curve
ys : [float]
The y values for the fit curve.
"""
dx = xs[1] - xs[0]
# We know the curves curve near the origin, so ignore the first part
i = int(start / dx)
if i > len(y):
i = len(y) // 2
popt, pcov, infodict, msg, ierr = curve_fit(
axb,
xs[i:],
y[i:],
full_output=True,
sigma=sigma[i:],
absolute_sigma=True,
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("")
logger.debug(f"{popt=}")
logger.debug(f"{pcov=}")
logger.debug(pprint.pformat(infodict, compact=True))
logger.debug(f"{msg=}")
logger.debug(f"{ierr=}")
slope = float(popt[0])
b = float(popt[1])
err = float(np.sqrt(np.diag(pcov)[0]))
ys = []
for x in xs[i:]:
ys.append(axb(x, slope, b))
return slope, err, xs[i:], ys
[docs]
def get_helfand_slope(y, xs, sigma=None):
"""Get the instantaneous slope of the Helfand moments
Parameters
----------
y : [float] or numpy.ndarray()
The Helfand moment
xs : [float]
The time (x) coordinate
sigma : [float] or numpy.ndarray()
Optional standard error of y
Returns
-------
slope : float
The fit slope.
stderr : float
The 95% standard error of the slope
xs : [float]
The x values (time) for the fit curve
ys : [float]
The y values for the fit curve.
"""
n = len(y)
dx = xs[1] - xs[0]
slope = np.zeros_like(y)
slope[2:] = (y[2:] - y[0 : n - 2]) / (2 * dx)
new_x = xs
new_sigma = np.zeros_like(y)
new_sigma[2:] = sigma[2:] + sigma[0 : n - 2]
new_sigma[0] = new_sigma[3]
new_sigma[1] = new_sigma[3]
return slope, new_x, new_sigma
[docs]
def plot_correlation_functions(
figure, CCF, ts, err=None, fit=None, labels=tensor_labels
):
"""Create a plot for the heat flux cross-correlation functions.
Parameters
----------
figure : seamm_util.Figure
The figure that contains the plots.
CCF : numpy.mdarray(6, m)
The cross-correlation functions in W^2/m^4
ts : [float]
The times associated with the CCF, in ps
err : numpy.ndarray(6, m)
The standard errors of the CCF (optional)
fit :
The fit parameters for any fit of the CFF (optional)
"""
plot = figure.add_plot("CCF")
x_axis = plot.add_axis("x", label="t (ps)")
y_axis = plot.add_axis("y", label="J0Jt (W^2/m^4)", anchor=x_axis)
x_axis.anchor = y_axis
for i, tmp in enumerate(labels):
label, color, colora = tmp
if fit is not None:
hover = f"{label} = {fit[i]['kappa']:.3f} ± {fit[i]['stderr']:.3f} W/m/K"
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"fit{label}",
hovertemplate=hover,
x=fit[i]["xs"],
xlabel="t",
xunits="ps",
y=fit[i]["ys"],
ylabel=f"fit{label}",
yunits="W/m/K*ps",
color=color,
dash="dash",
width=3,
)
if err is not None:
errs = np.concatenate((CCF[i] + err[i], CCF[i, ::-1] - err[i, ::-1]))
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"±{label}",
x=ts + ts[::-1],
xlabel="t",
xunits="ps",
y=errs.tolist(),
ylabel=f"±{label}",
yunits="W^2/m^4",
color=colora,
fill="toself",
visible="legendonly",
)
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"J0Jt{label}",
x=ts,
xlabel="t",
xunits="ps",
y=CCF[i].tolist(),
ylabel=f"J0Jt{label}",
yunits="W^2/m^4",
color=color,
)
return plot
[docs]
def plot_GK_integrals(
figure, x, ts, err=None, fit=None, labels=tensor_labels, _range=None
):
"""Create a plot of the Green-Kubo integrals
Parameters
----------
figure : seamm_util.Figure
The figure that contains the plots.
x : numpy.mdarray(6, m)
The cumulative integrals in W/m/K
ts : [float]
The times associated with the integrals, in ps
err : numpy.ndarray(6, m)
The standard errors of the integrals (optional)
fit :
The fit parameters for any fit of the CFF (optional)
_range : [float]
The range of the x-axis in terms of units in x.
"""
plot = figure.add_plot("GKI")
x_axis = plot.add_axis("x", label="t (ps)")
y_axis = plot.add_axis("y", label="Kappa (W/m/K)", anchor=x_axis)
x_axis.anchor = y_axis
if _range is not None:
x_axis["range"] = _range
for i, tmp in enumerate(labels):
label, color, colora = tmp
if fit is not None and i < len(fit):
hover = f"{label} = {fit[i]['kappa']:.3f} ± {fit[i]['stderr']:.3f} W/m/K"
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"fit{label}",
hovertemplate=hover,
x=fit[i]["xs"],
xlabel="t",
xunits="ps",
y=fit[i]["ys"],
ylabel=f"fit{label}",
yunits="W/m/K",
color=color,
dash="dash",
width=3,
)
if err is not None:
errs = np.concatenate((x[i] + err[i], x[i, ::-1] - err[i, ::-1]))
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"±{label}",
x=ts + ts[::-1],
xlabel="t",
xunits="ps",
y=errs.tolist(),
ylabel=f"±{label}",
yunits="W/m/K",
color=colora,
fill="toself",
visible="legendonly",
)
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"K{label}",
x=ts,
xlabel="t",
xunits="ps",
y=x[i].tolist(),
ylabel=f"K{label}",
yunits="W/m/K",
color=color,
)
return plot
[docs]
def plot_helfand_moments(figure, M, ts, err=None, fit=None, labels=tensor_labels):
"""Create a plot for the Helfand moments.
Parameters
----------
figure : seamm_util.Figure
The figure that contains the plots.
M : numpy.mdarray(6, m)
The Helfand moments, in W/m/K*ps
ts : [float]
The times associated with the moments, in ps
"""
plot = figure.add_plot("M")
x_axis = plot.add_axis("x", label="Time (ps)")
y_axis = plot.add_axis("y", label="M (W/m/K*ps)", anchor=x_axis)
x_axis.anchor = y_axis
for i, tmp in enumerate(labels):
label, color, colora = tmp
if fit is not None:
hover = f"κ{label} = {fit[i]['kappa']:.3f} ± {fit[i]['stderr']:.3f} W/m/K"
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"fit{label}",
hovertemplate=hover,
x=fit[i]["xs"],
xlabel="t",
xunits="ps",
y=fit[i]["ys"],
ylabel=f"fit{label}",
yunits="W/m/K*ps",
color=color,
dash="dash",
width=3,
)
if err is not None:
errs = np.concatenate((M[i] + err[i], M[i, ::-1] - err[i, ::-1]))
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"±{label}",
x=ts + ts[::-1],
xlabel="t",
xunits="ps",
y=errs.tolist(),
ylabel=f"±{label}",
yunits="W/m/K*ps",
color=colora,
fill="toself",
visible="legendonly",
)
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"M{label}",
x=ts,
xlabel="t",
xunits="ps",
y=M[i, :].tolist(),
ylabel=f"M{label}",
yunits="W/m/K*ps",
color=color,
)
return plot
[docs]
def plot_helfand_slopes(
figure, x, ts, err=None, fit=None, labels=tensor_labels, _range=None
):
"""Create a plot of the slope of the Helfand moments
Parameters
----------
figure : seamm_util.Figure
The figure that contains the plots.
x : numpy.mdarray(6, m)
The slope in W/m/K
ts : [float]
The times associated with the points, in ps
err : numpy.ndarray(6, m)
The standard errors of the points (optional)
fit :
The fit parameters for any fit of the slope (optional)
_range : [float]
The range of the x-axis in terms of units in x.
"""
plot = figure.add_plot("Slope")
x_axis = plot.add_axis("x", label="t (ps)")
y_axis = plot.add_axis("y", label="Kappa (W/m/K)", anchor=x_axis)
x_axis.anchor = y_axis
if _range is not None:
x_axis["range"] = _range
for i, tmp in enumerate(labels):
label, color, colora = tmp
if fit is not None and i < len(fit):
hover = f"{label} = {fit[i]['kappa']:.3f} ± {fit[i]['stderr']:.3f} W/m/K"
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"fit{label}",
hovertemplate=hover,
x=fit[i]["xs"],
xlabel="t",
xunits="ps",
y=fit[i]["ys"],
ylabel=f"fit{label}",
yunits="W/m/K",
color=color,
dash="dash",
width=3,
)
if err is not None:
errs = np.concatenate((x[i] + err[i], x[i][::-1] - err[i][::-1]))
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"±{label}",
x=ts + ts[::-1],
xlabel="t",
xunits="ps",
y=errs.tolist(),
ylabel=f"±{label}",
yunits="W/m/K",
color=colora,
fill="toself",
visible="legendonly",
)
plot.add_trace(
x_axis=x_axis,
y_axis=y_axis,
name=f"K{label}",
x=ts,
xlabel="t",
xunits="ps",
y=x[i].tolist(),
ylabel=f"K{label}",
yunits="W/m/K",
color=color,
)
return plot