# -*- coding: utf-8 -*-
"""Non-graphical part of the Thermal Conductivity step in a SEAMM flowchart"""
import importlib
import json
import logging
from math import log10, ceil
from pathlib import Path
import sys
import traceback
import numpy as np
import pandas
from tabulate import tabulate
from .analysis import (
create_correlation_functions,
create_helfand_moments,
fit_green_kubo_integral,
fit_helfand_moment,
get_helfand_slope,
plot_correlation_functions,
plot_GK_integrals,
plot_helfand_moments,
plot_helfand_slopes,
)
from .thermal_conductivity_parameters import ThermalConductivityParameters
import thermal_conductivity_step
import molsystem
import seamm
import seamm_util
from seamm_util import Q_
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __
# In addition to the normal logger, two logger-like printing facilities are
# defined: "job" and "printer". "job" send output to the main job.out file for
# the job, and should be used very sparingly, typically to echo what this step
# will do in the initial summary of the job.
#
# "printer" sends output to the file "step.out" in this steps working
# directory, and is used for all normal output from this step.
logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("Thermal Conductivity")
# Add this module's properties to the standard properties
path = importlib.resources.files("thermal_conductivity_step") / "data"
csv_file = path / "properties.csv"
if path.exists():
molsystem.add_properties_from_file(csv_file)
[docs]
def fmt_err(value, err, precision=2):
try:
decimals = -ceil(log10(err)) + precision
except Exception:
e = "--"
try:
v = f"{value:.2f}"
except Exception:
v = value
else:
if decimals < 0:
decimals = 0
fmt = f".{decimals}f"
e = f"{err:{fmt}}"
try:
v = f"{value:{fmt}}"
except Exception:
v = value
return v, e
[docs]
class ThermalConductivity(seamm.Node):
"""
The non-graphical part of a Thermal Conductivity step in a flowchart.
Attributes
----------
parser : configargparse.ArgParser
The parser object.
options : tuple
It contains a two item tuple containing the populated namespace and the
list of remaining argument strings.
subflowchart : seamm.Flowchart
A SEAMM Flowchart object that represents a subflowchart, if needed.
parameters : ThermalConductivityParameters
The control parameters for Thermal Conductivity.
See Also
--------
TkThermalConductivity,
ThermalConductivity, ThermalConductivityParameters
"""
def __init__(
self,
flowchart=None,
title="Thermal Conductivity",
namespace="org.molssi.seamm",
extension=None,
logger=logger,
):
"""A step for Thermal Conductivity in a SEAMM flowchart.
You may wish to change the title above, which is the string displayed
in the box representing the step in the flowchart.
Parameters
----------
flowchart: seamm.Flowchart
The non-graphical flowchart that contains this step.
title: str
The name displayed in the flowchart.
namespace : str
The namespace for the plug-ins of the subflowchart
extension: None
Not yet implemented
logger : Logger = logger
The logger to use and pass to parent classes
Returns
-------
None
"""
logger.debug(f"Creating Thermal Conductivity {self}")
self.subflowchart = seamm.Flowchart(
parent=self, name="Thermal Conductivity", namespace=namespace
)
super().__init__(
flowchart=flowchart,
title="Thermal Conductivity",
extension=extension,
module=__name__,
logger=logger,
)
self._metadata = thermal_conductivity_step.metadata
self.parameters = ThermalConductivityParameters()
self.tensor_labels = [
("xx", "red", "rgba(255,0,0,0.1)"),
("yy", "green", "rgba(0,255,0,0.1)"),
("zz", "blue", "rgba(0,0,255,0.1)"),
("xy", "rgb(127,127,0)", "rgba(127,127,0,0.1)"),
("xz", "rgb(255,0,255)", "rgba(255,0,255,0.1)"),
("yz", "rgb(0,255,255)", "rgba(0,255,255,0.1)"),
]
self._file_handler = None
# Various data
self.J = None # The heat fluxes, nruns x 3 x n
self.V = [] # The volumes of the runs
self.T = [] # The temperatures of the runs
self.timestep = [] # The timestep in fs for the run
self.Ms = [] # Helfand moments per run
self.M = None # The Helfand moments, 6 x m
self.M_err = None # The stderr on HM, 6 x m
self.Jcfs = [] # The ccf's of J, 6 x n, per run
self.Jcf = None # The ccf's of J, 6 x n
self.Jcf_err = None # The stderr of ccf's, 6 x n
self.GK_integrals = [] # Cumulative integral of Jcf, per run
self.GK_integral = None # Cumulative integral of Jcf
self.GK_integral_err = None # Error of cumulative integral of Jcf
# Set our citation level to 1!
self.citation_level = 1
@property
def version(self):
"""The semantic version of this module."""
return thermal_conductivity_step.__version__
@property
def git_revision(self):
"""The git version of this module."""
return thermal_conductivity_step.__git_revision__
[docs]
def create_parser(self):
"""Setup the command-line / config file parser"""
parser_name = "thermal-conductivity-step"
parser = seamm_util.getParser()
self.subflowchart._parser = self.flowchart._parser
# Remember if the parser exists ... this type of step may have been
# found before
parser_exists = parser.exists(parser_name)
# Create the standard options, e.g. log-level
super().create_parser(name=parser_name)
if not parser_exists:
# Any options for thermal conductivity itself
parser.add_argument(
parser_name,
"--html",
action="store_true",
help="whether to write out html files for graphs, etc.",
)
# Now need to walk through the steps in the subflowchart...
self.subflowchart.reset_visited()
node = self.subflowchart.get_node("1").next()
while node is not None:
node = node.create_parser()
return self.next()
[docs]
def description_text(self, P=None, short=False):
"""Create the text description of what this step will do.
The dictionary of control values is passed in as P so that
the code can test values, etc.
Parameters
----------
P: dict
An optional dictionary of the current values of the control
parameters.
Returns
-------
str
A description of the current step.
"""
if P is None:
P = self.parameters.values_to_dict()
text = (
f"Calculate the thermal conductivity at {P['T']} using "
f"the {P['approach']} approach. {P['nruns']} runs will be averaged for "
"the final results.\n\n"
)
if not short:
# The subflowchart
self.subflowchart.root_directory = self.flowchart.root_directory
# Get the first real node
node = self.subflowchart.get_node("1").next()
while node is not None:
try:
text += __(node.description_text()).__str__()
except Exception as e:
print(
f"Error describing thermal_conductivity flowchart: {e} in "
f"{node}"
)
self.logger.critical(
f"Error describing thermal_conductivity flowchart: {e} in "
f"{node}"
)
raise
except: # noqa: E722
print(
"Unexpected error describing thermal_conductivity flowchart: "
f"{sys.exc_info()[0]} in {str(node)}"
)
self.logger.critical(
"Unexpected error describing thermal_conductivity flowchart: "
f"{sys.exc_info()[0]} in {str(node)}"
)
raise
text += "\n"
node = node.next()
return self.header + "\n" + __(text, **P, indent=4 * " ").__str__()
[docs]
def run(self):
"""Run a Thermal Conductivity step.
Parameters
----------
None
Returns
-------
seamm.Node
The next node object in the flowchart.
"""
next_node = super().run(printer)
# Get the values of the parameters, dereferencing any variables
P = self.parameters.current_values_to_dict(
context=seamm.flowchart_variables._data
)
# Print what we are doing
printer.important(__(self.description_text(P, short=True), indent=self.indent))
# Find the handler for job.out and set the level up
job_handler = None
out_handler = None
for handler in job.handlers:
if (
isinstance(handler, logging.FileHandler)
and "job.out" in handler.baseFilename
):
job_handler = handler
job_level = job_handler.level
job_handler.setLevel(printing.JOB)
elif isinstance(handler, logging.StreamHandler):
out_handler = handler
out_level = out_handler.level
out_handler.setLevel(printing.JOB)
# Get the first real node
first_node = self.subflowchart.get_node("1").next()
# Ensure the nodes have their options
node = first_node
while node is not None:
node.all_options = self.all_options
node = node.next()
# And the subflowchart has the executor
self.subflowchart.executor = self.flowchart.executor
# Loop over the runs
nruns = P["nruns"]
fmt = f"0{len(str(nruns))}"
for run in range(1, nruns + 1):
# Direct most output to iteration.out
run_dir = Path(self.directory) / f"run_{run:{fmt}}"
run_dir.mkdir(parents=True, exist_ok=True)
# A handler for the file
if self._file_handler is not None:
self._file_handler.close()
job.removeHandler(self._file_handler)
path = run_dir / "Run.out"
path.unlink(missing_ok=True)
self._file_handler = logging.FileHandler(path)
self._file_handler.setLevel(printing.NORMAL)
formatter = logging.Formatter(fmt="{message:s}", style="{")
self._file_handler.setFormatter(formatter)
job.addHandler(self._file_handler)
# Add the run to the ids so the directory structure is reasonable
self.flowchart.reset_visited()
self.set_subids((*self._id, f"run_{run:{fmt}}"))
# Run through the steps in the loop body
node = first_node
try:
while node is not None:
node = node.run()
except DeprecationWarning as e:
printer.normal("\nDeprecation warning: " + str(e))
traceback.print_exc(file=sys.stderr)
traceback.print_exc(file=sys.stdout)
except Exception as e:
printer.job(f"Caught exception in run {run}: {str(e)}")
with open(run_dir / "stderr.out", "a") as fd:
traceback.print_exc(file=fd)
if "continue" in P["errors"]:
continue
elif "exit" in P["errors"]:
break
else:
raise
else:
self.process_run(run, run_dir)
if True and len(self.V) > 1:
if job_handler is not None:
job_handler.setLevel(job_level)
if out_handler is not None:
out_handler.setLevel(out_level)
self.analyze(P=P, style="1-line", run=run)
if job_handler is not None:
job_handler.setLevel(printing.JOB)
if out_handler is not None:
out_handler.setLevel(printing.JOB)
self.logger.debug(f"End of run {run}")
# Remove any redirection of printing.
if self._file_handler is not None:
self._file_handler.close()
job.removeHandler(self._file_handler)
self._file_handler = None
if job_handler is not None:
job_handler.setLevel(job_level)
if out_handler is not None:
out_handler.setLevel(out_level)
# Analyze the results
self.analyze(P=P, style="full")
# Add other citations here or in the appropriate place in the code.
# Add the bibtex to data/references.bib, and add a self.reference.cite
# similar to the above to actually add the citation to the references.
return next_node
[docs]
def analyze(self, indent="", P=None, style="full", run=None, **kwargs):
"""Do any analysis of the output from this step.
Also print important results to the local step.out file using
"printer".
Parameters
----------
indent: str
An extra indentation for the output
"""
if style == "1-line":
table = {
"Run": [],
"Kxx": [],
"exx": [],
"Kyy": [],
"eyy": [],
"Kzz": [],
"ezz": [],
"Kxy": [],
"exy": [],
"Kxz": [],
"exz": [],
"Kyz": [],
"eyz": [],
}
elif style == "full":
table = {
"Method": [],
"Dir": [],
"Kappa": [],
"±": [],
"95%": [],
"Units": [],
}
dt = Q_(self.timestep[0], "fs")
ts = np.arange(self.M.shape[1]) * dt.m_as("ps") # Scale to ps
ts = ts.tolist()
# Get and plot the slopes of the Helfand moments
slopes = []
xs = []
errs = []
fit0 = []
for i in range(6):
slope, x, err = get_helfand_slope(self.M[i], ts, sigma=self.M_err[i])
slopes.append(slope)
xs.append(x)
errs.append(err)
if i < 3:
(
kappa,
kappa_err,
a,
a_err,
tau,
tau_err,
tf,
yf,
) = fit_green_kubo_integral(slope, x, sigma=err)
fit0.append(
{
"kappa": kappa,
"stderr": kappa_err,
"xs": tf,
"ys": yf,
"a": a,
"a_err": a_err,
"tau": tau,
"tau_err": tau_err,
}
)
v, e = fmt_err(kappa, 2 * kappa_err)
if style == "1-line":
if i == 0:
table["Run"].append(len(self.V))
alpha = self.tensor_labels[i][0]
table["K" + alpha].append(v)
table["e" + alpha].append(e)
elif style == "full":
table["Method"].append("Helfand Derivative" if i == 0 else "")
table["Dir"].append(self.tensor_labels[i][0])
table["Kappa"].append(v)
table["±"].append("±")
table["95%"].append(e)
table["Units"].append("W/m/K" if i == 0 else "")
else:
if style == "1-line":
# blank the off-diagonals
alpha = self.tensor_labels[i][0]
table["K" + alpha].append("")
table["e" + alpha].append("")
figure = self.create_figure(
module_path=("seamm",),
template="line.graph_template",
title="Helfand Derivatives",
)
plot_helfand_slopes(
figure, slopes, xs[0], err=errs, fit=fit0, labels=self.tensor_labels
)
figure.grid_plots("Slope")
# Write to disk
filename = "HelfandDerivatives.graph"
path = Path(self.directory) / filename
figure.dump(path)
if "html" in self.options and self.options["html"]:
path = path.with_suffix(".html")
figure.template = "line.html_template"
figure.dump(path)
# Fit the slopes
fit = []
for i in range(6):
if i < len(fit0):
start = max(fit0[i]["tau"]) * 3
else:
start = 1
if start < 1:
start = 1
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("\n\n\n**********************\n")
self.logger.debug(f"{i=}")
for v1, v2, v3 in zip(ts[0:9], self.M[i][0:9], self.M_err[i][0:9]):
self.logger.debug(f" {v1:.3f} {v2:12.4e} {v3:12.4e}")
self.logger.debug("...")
for v1, v2, v3 in zip(
ts[-9:-1], self.M[i][-9:-1], self.M_err[i][-9:-1]
):
self.logger.debug(f" {v1:.3f} {v2:12.4e} {v3:12.4e}")
self.logger.debug(f"{start=}")
self.logger.debug("--------")
self.logger.debug("")
try:
slope, err, xs, ys = fit_helfand_moment(
self.M[i], ts, sigma=self.M_err[i], start=start, logger=self.logger
)
fit.append(
{
"kappa": slope,
"stderr": err,
"xs": xs,
"ys": ys,
}
)
v, e = fmt_err(slope, 2 * err)
if style == "1-line":
alpha = self.tensor_labels[i][0]
table["K" + alpha].append(v)
table["e" + alpha].append(e)
elif style == "full":
table["Method"].append("Helfand Moments" if i == 0 else "")
table["Dir"].append(self.tensor_labels[i][0])
table["Kappa"].append(v)
table["±"].append("±")
table["95%"].append(e)
table["Units"].append("W/m/K" if i == 0 else "")
except Exception as e:
logger.warning("The fit of the Helfand moments failed. Continuing...")
logger.warning(e)
self.plot_helfand_moment(self.M, ts, M_err=self.M_err, fit=fit)
# Now for the Green-Kubo method
dt = Q_(self.timestep[0], "fs")
# Plot the correlation functions
ts = (np.arange(self.Jcf.shape[1]) * dt.m_as("ps")).tolist()
end = len(ts) // 4
self.plot_JJ_correlation(self.Jcf[:, :end], ts[:end], err=self.Jcf_err[:, :end])
# Fit the integrals to 1 - e(-tau/t0)
ts = np.arange(self.GK_integral.shape[1]) * dt.m_as("ps")
ts = ts.tolist()
fit = []
for i in range(3):
kappa, kappa_err, a, a_err, tau, tau_err, tf, yf = fit_green_kubo_integral(
self.GK_integral[i], ts, sigma=self.GK_integral_err[i]
)
fit.append(
{
"kappa": kappa,
"stderr": kappa_err,
"xs": tf,
"ys": yf,
"a": a,
"a_err": a_err,
"tau": tau,
"tau_err": tau_err,
}
)
v, e = fmt_err(kappa, 2 * kappa_err)
if style == "1-line":
if i == 0:
table["Run"].append("")
alpha = self.tensor_labels[i][0]
table["K" + alpha].append(v)
table["e" + alpha].append(e)
elif style == "full":
table["Method"].append("Green-Kubo" if i == 0 else "")
table["Dir"].append(self.tensor_labels[i][0])
table["Kappa"].append(v)
table["±"].append("±")
table["95%"].append(e)
table["Units"].append("W/m/K" if i == 0 else "")
for i in range(3, 6):
alpha = self.tensor_labels[i][0]
if style == "1-line":
# blank the off-diagonals
table["K" + alpha].append("")
table["e" + alpha].append("")
# Plot the Green-Kubo integrals
self.plot_GK_integral(self.GK_integral, ts, err=self.GK_integral_err, fit=fit)
# Print the table of results
if style == "1-line":
text = ""
tmp = tabulate(
table,
headers="keys",
tablefmt="simple_outline",
disable_numparse=True,
)
if len(self.V) == 2:
length = len(tmp.splitlines()[0])
text += "\n"
text += "Thermal Conductivity".center(length)
text += "\n"
text += "--------------------".center(length)
text += "\n"
text += "First line is the fit to Helfand derivative".center(length)
text += "\n"
text += "Second line is the slope of the Helfand moments".center(length)
text += "\n"
text += "Third line is the fit to Green-Kubo integral".center(length)
text += "\n"
text += "\n".join(tmp.splitlines()[0:-1])
else:
if run is not None and run == P["nruns"]:
text += "\n".join(tmp.splitlines()[-4:])
else:
text += "\n".join(tmp.splitlines()[-4:-1])
printer.normal(__(text, indent=8 * " ", wrap=False, dedent=False))
else:
text = ""
tmp = tabulate(
table,
headers="keys",
tablefmt="simple_outline",
disable_numparse=True,
colalign=(
"center",
"center",
"decimal",
"center",
"decimal",
"left",
),
)
length = len(tmp.splitlines()[0])
text += "\n"
text += "Thermal Conductivity".center(length)
text += "\n"
text += tmp
text += "\n"
printer.normal(__(text, indent=8 * " ", wrap=False, dedent=False))
[docs]
def plot_GK_integral(self, x, ts, err=None, fit=None, filename="GK_integral.graph"):
"""Create a plot of the GK integrals and any fit to them.
Parameters
----------
x : numpy.ndarray(6, m)
The cumulative integrals
ts : [float]
List of times for the points, in ps
err : numpy.ndarray(6, m)
The standard errors of the integrals (optional)
fit :
The fit parameters for the integrals (optional)
filename : str
The filename to write
"""
figure = self.create_figure(
module_path=("seamm",),
template="line.graph_template",
title="Green-Kubo Integrals",
)
max_tau = 0
for i in range(3):
tau = fit[i]["tau"][-1]
if tau > max_tau:
max_tau = tau
if max_tau * 4 > ts[-1]:
rng = None
else:
rng = [0, max_tau * 4]
plot_GK_integrals(
figure, x, ts, err=err, fit=fit, labels=self.tensor_labels, _range=rng
)
figure.grid_plots("GKI")
# Write to disk
path = Path(self.directory) / filename
figure.dump(path)
if "html" in self.options and self.options["html"]:
path = path.with_suffix(".html")
figure.template = "line.html_template"
figure.dump(path)
[docs]
def plot_helfand_moment(
self, M, ts, M_err=None, fit=None, filename="HelfandMoments.graph"
):
"""Create a plot of the Helfand moments and any fit to them.
Parameters
----------
M : numpy.ndarray(6, m)
The Helfand moments.
ts : [float]
List of times for the points, in ps
"""
figure = self.create_figure(
module_path=("seamm",),
template="line.graph_template",
title="Helfand Moments",
)
plot_helfand_moments(
figure, M, ts, err=M_err, fit=fit, labels=self.tensor_labels
)
figure.grid_plots("M")
# Write to disk
path = Path(self.directory) / filename
figure.dump(path)
if "html" in self.options and self.options["html"]:
path = path.with_suffix(".html")
figure.template = "line.html_template"
figure.dump(path)
[docs]
def plot_JJ_correlation(self, CCF, ts, err=None, fit=None, filename="JJ.graph"):
"""Create a plot of the orrelation functions and any fit to them.
Parameters
----------
CCF : numpy.ndarray(6, m)
The cross-correlation functions.
ts : [float]
List of times for the points, in ps
err : numpy.ndarray(6, m)
The standard errors of the CCF (optional)
fit :
The fit parameters for any fit of the CFF (optional)
filename : str
The filename to write
"""
figure = self.create_figure(
module_path=("seamm",),
template="line.graph_template",
title="Heat Flux Correlation Functions",
)
plot_correlation_functions(
figure, CCF, ts, err=err, fit=fit, labels=self.tensor_labels
)
figure.grid_plots("CCF")
# Write to disk
path = Path(self.directory) / filename
figure.dump(path)
if "html" in self.options and self.options["html"]:
path = path.with_suffix(".html")
figure.template = "line.html_template"
figure.dump(path)
[docs]
def process_run(self, run, run_dir):
"""Get the heat fluxes from the run and do initial processing.
Parameters
----------
run : int
The run number
run_dir : pathlib.Path
The toplevel directory of the run.
"""
paths = sorted(run_dir.glob("**/heat_flux.trj"))
if len(paths) == 0:
raise RuntimeError(
f"There is no heat flux data for run {run} in {run_dir}."
)
elif len(paths) > 1:
raise NotImplementedError(
f"Cannot handle multiple HeatFlux files from run {run} in {run_dir}."
)
# Process the trajectory data
control_properties = lambda x: x not in ["tstep"] # noqa: E731
with paths[0].open() as fd:
# Get the initial header line and check
line = fd.readline()
tmp = line.split()
if len(tmp) < 3:
raise RuntimeError(f"Bad header for {paths[0]}: {line}")
if tmp[0] != "!MolSSI":
raise RuntimeError(f"Not a MolSSI file? {paths[0]}: {line}")
if tmp[1] != "trajectory":
raise RuntimeError(f"Not a trajectory file? {paths[0]}: {line}")
if tmp[2][0] != "2":
raise RuntimeError(
f"Can only handle version 2 trajectory files. {paths[0]}: {line}"
)
metadata = json.loads(" ".join(tmp[3:]))
data = pandas.read_csv(
fd,
sep=" ",
header=0,
comment="!",
usecols=control_properties,
index_col=None,
)
dt = Q_(metadata["dt"], metadata["tunits"])
data = data.reset_index(drop=True)
data.index *= dt.m_as("fs")
Jx = data["Jx"].to_numpy()
Jy = data["Jy"].to_numpy()
Jz = data["Jz"].to_numpy()
J = np.stack((Jx, Jy, Jz))
# Get the state function V & T
path = paths[0].parent / "state.json"
if path.exists():
with path.open() as fd:
state = json.load(fd)
T = state["T"]["mean"]
if "V" in state:
V = state["V"]["mean"]
else:
_, configuration = self.get_system_configuration()
V = configuration.volume
self.V.append(V)
self.T.append(T)
else:
self.V.append(None)
self.T.append(None)
self.timestep.append(dt.magnitude)
# We need properties like the temperature and volume.
T = Q_(T, "K")
V = Q_(V, "Å^3")
k_B = Q_("k_B")
Jsq = Q_("W^2/m^4")
# Limit the lengths of the data
n = J.shape[1]
m = min(n // 10, 10000)
# Create the Helfand moments
constants = Jsq * V * dt**2 / (2 * k_B * T**2)
M = create_helfand_moments(J, m=m) * constants.m_as("W/m/K*ps")
self.Ms.append(M)
# And correlation functions for Green-Kubo method
constants = V / (k_B * T**2) * Jsq * dt
Jcf, integral = create_correlation_functions(J, m=m)
self.Jcfs.append(Jcf)
self.GK_integrals.append(integral * constants.m_as("W/m/K"))
# Merge the results and get the averages and errors
tmp = np.stack(self.Ms)
self.M = np.average(tmp, axis=0)
self.M_err = np.std(tmp, axis=0)
tmp = np.stack(self.Jcfs)
self.Jcf = np.average(tmp, axis=0)
self.Jcf_err = np.std(tmp, axis=0)
tmp = np.stack(self.GK_integrals)
self.GK_integral = np.average(tmp, axis=0)
self.GK_integral_err = np.std(tmp, axis=0)
[docs]
def set_id(self, node_id=()):
"""Sequentially number the subnodes"""
self.logger.debug("Setting ids for subflowchart {}".format(self))
if self.visited:
return None
else:
self.visited = True
self._id = node_id
self.set_subids(self._id)
return self.next()
[docs]
def set_subids(self, node_id=()):
"""Set the ids of the nodes in the subflowchart"""
node = self.subflowchart.get_node("1").next()
n = 1
while node is not None:
node = node.set_id((*node_id, str(n)))
n += 1