# -*- coding: utf-8 -*-
"""Non-graphical part of the Thermal Conductivity step in a SEAMM flowchart
"""
import json
import logging
from math import log10, ceil
from pathlib import Path
import pkg_resources
import sys
import traceback
import numpy as np
import pandas
from tabulate import tabulate
from .analysis import (
create_correlation_functions,
create_helfand_moments,
fit_green_kubo_integral,
fit_helfand_moment,
get_helfand_slope,
plot_correlation_functions,
plot_GK_integrals,
plot_helfand_moments,
plot_helfand_slopes,
)
from .thermal_conductivity_parameters import ThermalConductivityParameters
import thermal_conductivity_step
import molsystem
import seamm
import seamm_util
from seamm_util import Q_
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __
# In addition to the normal logger, two logger-like printing facilities are
# defined: "job" and "printer". "job" send output to the main job.out file for
# the job, and should be used very sparingly, typically to echo what this step
# will do in the initial summary of the job.
#
# "printer" sends output to the file "step.out" in this steps working
# directory, and is used for all normal output from this step.
logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("Thermal Conductivity")
# Add this module's properties to the standard properties
path = Path(pkg_resources.resource_filename(__name__, "data/"))
csv_file = path / "properties.csv"
if path.exists():
molsystem.add_properties_from_file(csv_file)
[docs]
def fmt_err(value, err, precision=2):
try:
decimals = -ceil(log10(err)) + precision
except Exception:
e = "--"
try:
v = f"{value:.2f}"
except Exception:
v = value
else:
if decimals < 0:
decimals = 0
fmt = f".{decimals}f"
e = f"{err:{fmt}}"
try:
v = f"{value:{fmt}}"
except Exception:
v = value
return v, e
[docs]
class ThermalConductivity(seamm.Node):
"""
The non-graphical part of a Thermal Conductivity step in a flowchart.
Attributes
----------
parser : configargparse.ArgParser
The parser object.
options : tuple
It contains a two item tuple containing the populated namespace and the
list of remaining argument strings.
subflowchart : seamm.Flowchart
A SEAMM Flowchart object that represents a subflowchart, if needed.
parameters : ThermalConductivityParameters
The control parameters for Thermal Conductivity.
See Also
--------
TkThermalConductivity,
ThermalConductivity, ThermalConductivityParameters
"""
def __init__(
self,
flowchart=None,
title="Thermal Conductivity",
namespace="org.molssi.seamm",
extension=None,
logger=logger,
):
"""A step for Thermal Conductivity in a SEAMM flowchart.
You may wish to change the title above, which is the string displayed
in the box representing the step in the flowchart.
Parameters
----------
flowchart: seamm.Flowchart
The non-graphical flowchart that contains this step.
title: str
The name displayed in the flowchart.
namespace : str
The namespace for the plug-ins of the subflowchart
extension: None
Not yet implemented
logger : Logger = logger
The logger to use and pass to parent classes
Returns
-------
None
"""
logger.debug(f"Creating Thermal Conductivity {self}")
self.subflowchart = seamm.Flowchart(
parent=self, name="Thermal Conductivity", namespace=namespace
)
super().__init__(
flowchart=flowchart,
title="Thermal Conductivity",
extension=extension,
module=__name__,
logger=logger,
)
self._metadata = thermal_conductivity_step.metadata
self.parameters = ThermalConductivityParameters()
self.tensor_labels = [
("xx", "red", "rgba(255,0,0,0.1)"),
("yy", "green", "rgba(0,255,0,0.1)"),
("zz", "blue", "rgba(0,0,255,0.1)"),
("xy", "rgb(127,127,0)", "rgba(127,127,0,0.1)"),
("xz", "rgb(255,0,255)", "rgba(255,0,255,0.1)"),
("yz", "rgb(0,255,255)", "rgba(0,255,255,0.1)"),
]
self._file_handler = None
# Various data
self.J = None # The heat fluxes, nruns x 3 x n
self.V = [] # The volumes of the runs
self.T = [] # The temperatures of the runs
self.timestep = [] # The timestep in fs for the run
self.Ms = [] # Helfand moments per run
self.M = None # The Helfand moments, 6 x m
self.M_err = None # The stderr on HM, 6 x m
self.Jcfs = [] # The ccf's of J, 6 x n, per run
self.Jcf = None # The ccf's of J, 6 x n
self.Jcf_err = None # The stderr of ccf's, 6 x n
self.GK_integrals = [] # Cumulative integral of Jcf, per run
self.GK_integral = None # Cumulative integral of Jcf
self.GK_integral_err = None # Error of cumulative integral of Jcf
# Set our citation level to 1!
self.citation_level = 1
@property
def version(self):
"""The semantic version of this module."""
return thermal_conductivity_step.__version__
@property
def git_revision(self):
"""The git version of this module."""
return thermal_conductivity_step.__git_revision__
[docs]
def create_parser(self):
"""Setup the command-line / config file parser"""
parser_name = "thermal-conductivity-step"
parser = seamm_util.getParser()
self.subflowchart._parser = self.flowchart._parser
# Remember if the parser exists ... this type of step may have been
# found before
parser_exists = parser.exists(parser_name)
# Create the standard options, e.g. log-level
super().create_parser(name=parser_name)
if not parser_exists:
# Any options for thermal conductivity itself
parser.add_argument(
parser_name,
"--html",
action="store_true",
help="whether to write out html files for graphs, etc.",
)
# Now need to walk through the steps in the subflowchart...
self.subflowchart.reset_visited()
node = self.subflowchart.get_node("1").next()
while node is not None:
node = node.create_parser()
return self.next()
[docs]
def description_text(self, P=None, short=False):
"""Create the text description of what this step will do.
The dictionary of control values is passed in as P so that
the code can test values, etc.
Parameters
----------
P: dict
An optional dictionary of the current values of the control
parameters.
Returns
-------
str
A description of the current step.
"""
if P is None:
P = self.parameters.values_to_dict()
text = (
f"Calculate the thermal conductivity at {P['T']} using "
f"the {P['approach']} approach. {P['nruns']} runs will be averaged for "
"the final results.\n\n"
)
if not short:
# The subflowchart
self.subflowchart.root_directory = self.flowchart.root_directory
# Get the first real node
node = self.subflowchart.get_node("1").next()
while node is not None:
try:
text += __(node.description_text()).__str__()
except Exception as e:
print(
f"Error describing thermal_conductivity flowchart: {e} in "
f"{node}"
)
self.logger.critical(
f"Error describing thermal_conductivity flowchart: {e} in "
f"{node}"
)
raise
except: # noqa: E722
print(
"Unexpected error describing thermal_conductivity flowchart: "
f"{sys.exc_info()[0]} in {str(node)}"
)
self.logger.critical(
"Unexpected error describing thermal_conductivity flowchart: "
f"{sys.exc_info()[0]} in {str(node)}"
)
raise
text += "\n"
node = node.next()
return self.header + "\n" + __(text, **P, indent=4 * " ").__str__()
[docs]
def run(self):
"""Run a Thermal Conductivity step.
Parameters
----------
None
Returns
-------
seamm.Node
The next node object in the flowchart.
"""
next_node = super().run(printer)
# Get the values of the parameters, dereferencing any variables
P = self.parameters.current_values_to_dict(
context=seamm.flowchart_variables._data
)
# Print what we are doing
printer.important(__(self.description_text(P, short=True), indent=self.indent))
# Find the handler for job.out and set the level up
job_handler = None
out_handler = None
for handler in job.handlers:
if (
isinstance(handler, logging.FileHandler)
and "job.out" in handler.baseFilename
):
job_handler = handler
job_level = job_handler.level
job_handler.setLevel(printing.JOB)
elif isinstance(handler, logging.StreamHandler):
out_handler = handler
out_level = out_handler.level
out_handler.setLevel(printing.JOB)
# Get the first real node
first_node = self.subflowchart.get_node("1").next()
# Ensure the nodes have their options
node = first_node
while node is not None:
node.all_options = self.all_options
node = node.next()
# And the subflowchart has the executor
self.subflowchart.executor = self.flowchart.executor
# Loop over the runs
nruns = P["nruns"]
fmt = f"0{len(str(nruns))}"
for run in range(1, nruns + 1):
# Direct most output to iteration.out
run_dir = Path(self.directory) / f"run_{run:{fmt}}"
run_dir.mkdir(parents=True, exist_ok=True)
# A handler for the file
if self._file_handler is not None:
self._file_handler.close()
job.removeHandler(self._file_handler)
path = run_dir / "Run.out"
path.unlink(missing_ok=True)
self._file_handler = logging.FileHandler(path)
self._file_handler.setLevel(printing.NORMAL)
formatter = logging.Formatter(fmt="{message:s}", style="{")
self._file_handler.setFormatter(formatter)
job.addHandler(self._file_handler)
# Add the run to the ids so the directory structure is reasonable
self.flowchart.reset_visited()
self.set_subids((*self._id, f"run_{run:{fmt}}"))
# Run through the steps in the loop body
node = first_node
try:
while node is not None:
node = node.run()
except DeprecationWarning as e:
printer.normal("\nDeprecation warning: " + str(e))
traceback.print_exc(file=sys.stderr)
traceback.print_exc(file=sys.stdout)
except Exception as e:
printer.job(f"Caught exception in run {run}: {str(e)}")
with open(run_dir / "stderr.out", "a") as fd:
traceback.print_exc(file=fd)
if "continue" in P["errors"]:
continue
elif "exit" in P["errors"]:
break
else:
raise
else:
self.process_run(run, run_dir)
if True and len(self.V) > 1:
if job_handler is not None:
job_handler.setLevel(job_level)
if out_handler is not None:
out_handler.setLevel(out_level)
self.analyze(P=P, style="1-line", run=run)
if job_handler is not None:
job_handler.setLevel(printing.JOB)
if out_handler is not None:
out_handler.setLevel(printing.JOB)
self.logger.debug(f"End of run {run}")
# Remove any redirection of printing.
if self._file_handler is not None:
self._file_handler.close()
job.removeHandler(self._file_handler)
self._file_handler = None
if job_handler is not None:
job_handler.setLevel(job_level)
if out_handler is not None:
out_handler.setLevel(out_level)
# Analyze the results
self.analyze(P=P, style="full")
# Add other citations here or in the appropriate place in the code.
# Add the bibtex to data/references.bib, and add a self.reference.cite
# similar to the above to actually add the citation to the references.
return next_node
[docs]
def analyze(self, indent="", P=None, style="full", run=None, **kwargs):
"""Do any analysis of the output from this step.
Also print important results to the local step.out file using
"printer".
Parameters
----------
indent: str
An extra indentation for the output
"""
if style == "1-line":
table = {
"Run": [],
"Kxx": [],
"exx": [],
"Kyy": [],
"eyy": [],
"Kzz": [],
"ezz": [],
"Kxy": [],
"exy": [],
"Kxz": [],
"exz": [],
"Kyz": [],
"eyz": [],
}
elif style == "full":
table = {
"Method": [],
"Dir": [],
"Kappa": [],
"±": [],
"95%": [],
"Units": [],
}
dt = Q_(self.timestep[0], "fs")
ts = np.arange(self.M.shape[1]) * dt.m_as("ps") # Scale to ps
ts = ts.tolist()
# Get and plot the slopes of the Helfand moments
slopes = []
xs = []
errs = []
fit0 = []
for i in range(6):
slope, x, err = get_helfand_slope(self.M[i], ts, sigma=self.M_err[i])
slopes.append(slope)
xs.append(x)
errs.append(err)
if i < 3:
(
kappa,
kappa_err,
a,
a_err,
tau,
tau_err,
tf,
yf,
) = fit_green_kubo_integral(slope, x, sigma=err)
fit0.append(
{
"kappa": kappa,
"stderr": kappa_err,
"xs": tf,
"ys": yf,
"a": a,
"a_err": a_err,
"tau": tau,
"tau_err": tau_err,
}
)
v, e = fmt_err(kappa, 2 * kappa_err)
if style == "1-line":
if i == 0:
table["Run"].append(len(self.V))
alpha = self.tensor_labels[i][0]
table["K" + alpha].append(v)
table["e" + alpha].append(e)
elif style == "full":
table["Method"].append("Helfand Derivative" if i == 0 else "")
table["Dir"].append(self.tensor_labels[i][0])
table["Kappa"].append(v)
table["±"].append("±")
table["95%"].append(e)
table["Units"].append("W/m/K" if i == 0 else "")
else:
if style == "1-line":
# blank the off-diagonals
alpha = self.tensor_labels[i][0]
table["K" + alpha].append("")
table["e" + alpha].append("")
figure = self.create_figure(
module_path=("seamm",),
template="line.graph_template",
title="Helfand Derivatives",
)
plot_helfand_slopes(
figure, slopes, xs[0], err=errs, fit=fit0, labels=self.tensor_labels
)
figure.grid_plots("Slope")
# Write to disk
filename = "HelfandDerivatives.graph"
path = Path(self.directory) / filename
figure.dump(path)
if "html" in self.options and self.options["html"]:
path = path.with_suffix(".html")
figure.template = "line.html_template"
figure.dump(path)
# Fit the slopes
fit = []
for i in range(6):
if i < len(fit0):
start = max(fit0[i]["tau"]) * 3
else:
start = 1
if start < 1:
start = 1
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("\n\n\n**********************\n")
self.logger.debug(f"{i=}")
for v1, v2, v3 in zip(ts[0:9], self.M[i][0:9], self.M_err[i][0:9]):
self.logger.debug(f" {v1:.3f} {v2:12.4e} {v3:12.4e}")
self.logger.debug("...")
for v1, v2, v3 in zip(
ts[-9:-1], self.M[i][-9:-1], self.M_err[i][-9:-1]
):
self.logger.debug(f" {v1:.3f} {v2:12.4e} {v3:12.4e}")
self.logger.debug(f"{start=}")
self.logger.debug("--------")
self.logger.debug("")
try:
slope, err, xs, ys = fit_helfand_moment(
self.M[i], ts, sigma=self.M_err[i], start=start, logger=self.logger
)
fit.append(
{
"kappa": slope,
"stderr": err,
"xs": xs,
"ys": ys,
}
)
v, e = fmt_err(slope, 2 * err)
if style == "1-line":
alpha = self.tensor_labels[i][0]
table["K" + alpha].append(v)
table["e" + alpha].append(e)
elif style == "full":
table["Method"].append("Helfand Moments" if i == 0 else "")
table["Dir"].append(self.tensor_labels[i][0])
table["Kappa"].append(v)
table["±"].append("±")
table["95%"].append(e)
table["Units"].append("W/m/K" if i == 0 else "")
except Exception as e:
logger.warning("The fit of the Helfand moments failed. Continuing...")
logger.warning(e)
self.plot_helfand_moment(self.M, ts, M_err=self.M_err, fit=fit)
# Now for the Green-Kubo method
dt = Q_(self.timestep[0], "fs")
# Plot the correlation functions
ts = (np.arange(self.Jcf.shape[1]) * dt.m_as("ps")).tolist()
end = len(ts) // 4
self.plot_JJ_correlation(self.Jcf[:, :end], ts[:end], err=self.Jcf_err[:, :end])
# Fit the integrals to 1 - e(-tau/t0)
ts = np.arange(self.GK_integral.shape[1]) * dt.m_as("ps")
ts = ts.tolist()
fit = []
for i in range(3):
kappa, kappa_err, a, a_err, tau, tau_err, tf, yf = fit_green_kubo_integral(
self.GK_integral[i], ts, sigma=self.GK_integral_err[i]
)
fit.append(
{
"kappa": kappa,
"stderr": kappa_err,
"xs": tf,
"ys": yf,
"a": a,
"a_err": a_err,
"tau": tau,
"tau_err": tau_err,
}
)
v, e = fmt_err(kappa, 2 * kappa_err)
if style == "1-line":
if i == 0:
table["Run"].append("")
alpha = self.tensor_labels[i][0]
table["K" + alpha].append(v)
table["e" + alpha].append(e)
elif style == "full":
table["Method"].append("Green-Kubo" if i == 0 else "")
table["Dir"].append(self.tensor_labels[i][0])
table["Kappa"].append(v)
table["±"].append("±")
table["95%"].append(e)
table["Units"].append("W/m/K" if i == 0 else "")
for i in range(3, 6):
alpha = self.tensor_labels[i][0]
if style == "1-line":
# blank the off-diagonals
table["K" + alpha].append("")
table["e" + alpha].append("")
# Plot the Green-Kubo integrals
self.plot_GK_integral(self.GK_integral, ts, err=self.GK_integral_err, fit=fit)
# Print the table of results
if style == "1-line":
text = ""
tmp = tabulate(
table,
headers="keys",
tablefmt="simple_outline",
disable_numparse=True,
)
if len(self.V) == 2:
length = len(tmp.splitlines()[0])
text += "\n"
text += "Thermal Conductivity".center(length)
text += "\n"
text += "--------------------".center(length)
text += "\n"
text += "First line is the fit to Helfand derivative".center(length)
text += "\n"
text += "Second line is the slope of the Helfand moments".center(length)
text += "\n"
text += "Third line is the fit to Green-Kubo integral".center(length)
text += "\n"
text += "\n".join(tmp.splitlines()[0:-1])
else:
if run is not None and run == P["nruns"]:
text += "\n".join(tmp.splitlines()[-4:])
else:
text += "\n".join(tmp.splitlines()[-4:-1])
printer.normal(__(text, indent=8 * " ", wrap=False, dedent=False))
else:
text = ""
tmp = tabulate(
table,
headers="keys",
tablefmt="simple_outline",
disable_numparse=True,
colalign=(
"center",
"center",
"decimal",
"center",
"decimal",
"left",
),
)
length = len(tmp.splitlines()[0])
text += "\n"
text += "Thermal Conductivity".center(length)
text += "\n"
text += tmp
text += "\n"
printer.normal(__(text, indent=8 * " ", wrap=False, dedent=False))
[docs]
def plot_GK_integral(self, x, ts, err=None, fit=None, filename="GK_integral.graph"):
"""Create a plot of the GK integrals and any fit to them.
Parameters
----------
x : numpy.ndarray(6, m)
The cumulative integrals
ts : [float]
List of times for the points, in ps
err : numpy.ndarray(6, m)
The standard errors of the integrals (optional)
fit :
The fit parameters for the integrals (optional)
filename : str
The filename to write
"""
figure = self.create_figure(
module_path=("seamm",),
template="line.graph_template",
title="Green-Kubo Integrals",
)
max_tau = 0
for i in range(3):
tau = fit[i]["tau"][-1]
if tau > max_tau:
max_tau = tau
if max_tau * 4 > ts[-1]:
rng = None
else:
rng = [0, max_tau * 4]
plot_GK_integrals(
figure, x, ts, err=err, fit=fit, labels=self.tensor_labels, _range=rng
)
figure.grid_plots("GKI")
# Write to disk
path = Path(self.directory) / filename
figure.dump(path)
if "html" in self.options and self.options["html"]:
path = path.with_suffix(".html")
figure.template = "line.html_template"
figure.dump(path)
[docs]
def plot_helfand_moment(
self, M, ts, M_err=None, fit=None, filename="HelfandMoments.graph"
):
"""Create a plot of the Helfand moments and any fit to them.
Parameters
----------
M : numpy.ndarray(6, m)
The Helfand moments.
ts : [float]
List of times for the points, in ps
"""
figure = self.create_figure(
module_path=("seamm",),
template="line.graph_template",
title="Helfand Moments",
)
plot_helfand_moments(
figure, M, ts, err=M_err, fit=fit, labels=self.tensor_labels
)
figure.grid_plots("M")
# Write to disk
path = Path(self.directory) / filename
figure.dump(path)
if "html" in self.options and self.options["html"]:
path = path.with_suffix(".html")
figure.template = "line.html_template"
figure.dump(path)
[docs]
def plot_JJ_correlation(self, CCF, ts, err=None, fit=None, filename="JJ.graph"):
"""Create a plot of the orrelation functions and any fit to them.
Parameters
----------
CCF : numpy.ndarray(6, m)
The cross-correlation functions.
ts : [float]
List of times for the points, in ps
err : numpy.ndarray(6, m)
The standard errors of the CCF (optional)
fit :
The fit parameters for any fit of the CFF (optional)
filename : str
The filename to write
"""
figure = self.create_figure(
module_path=("seamm",),
template="line.graph_template",
title="Heat Flux Correlation Functions",
)
plot_correlation_functions(
figure, CCF, ts, err=err, fit=fit, labels=self.tensor_labels
)
figure.grid_plots("CCF")
# Write to disk
path = Path(self.directory) / filename
figure.dump(path)
if "html" in self.options and self.options["html"]:
path = path.with_suffix(".html")
figure.template = "line.html_template"
figure.dump(path)
[docs]
def process_run(self, run, run_dir):
"""Get the heat fluxes from the run and do initial processing.
Parameters
----------
run : int
The run number
run_dir : pathlib.Path
The toplevel directory of the run.
"""
paths = sorted(run_dir.glob("**/heat_flux.trj"))
if len(paths) == 0:
raise RuntimeError(
f"There is no heat flux data for run {run} in {run_dir}."
)
elif len(paths) > 1:
raise NotImplementedError(
f"Cannot handle multiple HeatFlux files from run {run} in {run_dir}."
)
# Process the trajectory data
control_properties = lambda x: x not in ["tstep"] # noqa: E731
with paths[0].open() as fd:
# Get the initial header line and check
line = fd.readline()
tmp = line.split()
if len(tmp) < 3:
raise RuntimeError(f"Bad header for {paths[0]}: {line}")
if tmp[0] != "!MolSSI":
raise RuntimeError(f"Not a MolSSI file? {paths[0]}: {line}")
if tmp[1] != "trajectory":
raise RuntimeError(f"Not a trajectory file? {paths[0]}: {line}")
if tmp[2][0] != "2":
raise RuntimeError(
f"Can only handle version 2 trajectory files. {paths[0]}: {line}"
)
metadata = json.loads(" ".join(tmp[3:]))
data = pandas.read_csv(
fd,
sep=" ",
header=0,
comment="!",
usecols=control_properties,
index_col=None,
)
dt = Q_(metadata["dt"], metadata["tunits"])
data = data.reset_index(drop=True)
data.index *= dt.m_as("fs")
Jx = data["Jx"].to_numpy()
Jy = data["Jy"].to_numpy()
Jz = data["Jz"].to_numpy()
J = np.stack((Jx, Jy, Jz))
# Get the state function V & T
path = paths[0].parent / "state.json"
if path.exists():
with path.open() as fd:
state = json.load(fd)
T = state["T"]["mean"]
if "V" in state:
V = state["V"]["mean"]
else:
_, configuration = self.get_system_configuration()
V = configuration.volume
self.V.append(V)
self.T.append(T)
else:
self.V.append(None)
self.T.append(None)
self.timestep.append(dt.magnitude)
# We need properties like the temperature and volume.
T = Q_(T, "K")
V = Q_(V, "Å^3")
k_B = Q_("k_B")
Jsq = Q_("W^2/m^4")
# Limit the lengths of the data
n = J.shape[1]
m = min(n // 10, 10000)
# Create the Helfand moments
constants = Jsq * V * dt**2 / (2 * k_B * T**2)
M = create_helfand_moments(J, m=m) * constants.m_as("W/m/K*ps")
self.Ms.append(M)
# And correlation functions for Green-Kubo method
constants = V / (k_B * T**2) * Jsq * dt
Jcf, integral = create_correlation_functions(J, m=m)
self.Jcfs.append(Jcf)
self.GK_integrals.append(integral * constants.m_as("W/m/K"))
# Merge the results and get the averages and errors
tmp = np.stack(self.Ms)
self.M = np.average(tmp, axis=0)
self.M_err = np.std(tmp, axis=0)
tmp = np.stack(self.Jcfs)
self.Jcf = np.average(tmp, axis=0)
self.Jcf_err = np.std(tmp, axis=0)
tmp = np.stack(self.GK_integrals)
self.GK_integral = np.average(tmp, axis=0)
self.GK_integral_err = np.std(tmp, axis=0)
[docs]
def set_id(self, node_id=()):
"""Sequentially number the subnodes"""
self.logger.debug("Setting ids for subflowchart {}".format(self))
if self.visited:
return None
else:
self.visited = True
self._id = node_id
self.set_subids(self._id)
return self.next()
[docs]
def set_subids(self, node_id=()):
"""Set the ids of the nodes in the subflowchart"""
node = self.subflowchart.get_node("1").next()
n = 1
while node is not None:
node = node.set_id((*node_id, str(n)))
n += 1