Source code for thermal_conductivity_step.thermal_conductivity

# -*- coding: utf-8 -*-

"""Non-graphical part of the Thermal Conductivity step in a SEAMM flowchart"""

import importlib
import json
import logging
from math import log10, ceil
from pathlib import Path
import sys
import traceback

import numpy as np
import pandas
from tabulate import tabulate

from .analysis import (
    create_correlation_functions,
    create_helfand_moments,
    fit_green_kubo_integral,
    fit_helfand_moment,
    get_helfand_slope,
    plot_correlation_functions,
    plot_GK_integrals,
    plot_helfand_moments,
    plot_helfand_slopes,
)
from .thermal_conductivity_parameters import ThermalConductivityParameters
import thermal_conductivity_step
import molsystem
import seamm
import seamm_util
from seamm_util import Q_
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __

# In addition to the normal logger, two logger-like printing facilities are
# defined: "job" and "printer". "job" send output to the main job.out file for
# the job, and should be used very sparingly, typically to echo what this step
# will do in the initial summary of the job.
#
# "printer" sends output to the file "step.out" in this steps working
# directory, and is used for all normal output from this step.

logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("Thermal Conductivity")

# Add this module's properties to the standard properties
path = importlib.resources.files("thermal_conductivity_step") / "data"
csv_file = path / "properties.csv"
if path.exists():
    molsystem.add_properties_from_file(csv_file)


[docs] def fmt_err(value, err, precision=2): try: decimals = -ceil(log10(err)) + precision except Exception: e = "--" try: v = f"{value:.2f}" except Exception: v = value else: if decimals < 0: decimals = 0 fmt = f".{decimals}f" e = f"{err:{fmt}}" try: v = f"{value:{fmt}}" except Exception: v = value return v, e
[docs] class ThermalConductivity(seamm.Node): """ The non-graphical part of a Thermal Conductivity step in a flowchart. Attributes ---------- parser : configargparse.ArgParser The parser object. options : tuple It contains a two item tuple containing the populated namespace and the list of remaining argument strings. subflowchart : seamm.Flowchart A SEAMM Flowchart object that represents a subflowchart, if needed. parameters : ThermalConductivityParameters The control parameters for Thermal Conductivity. See Also -------- TkThermalConductivity, ThermalConductivity, ThermalConductivityParameters """ def __init__( self, flowchart=None, title="Thermal Conductivity", namespace="org.molssi.seamm", extension=None, logger=logger, ): """A step for Thermal Conductivity in a SEAMM flowchart. You may wish to change the title above, which is the string displayed in the box representing the step in the flowchart. Parameters ---------- flowchart: seamm.Flowchart The non-graphical flowchart that contains this step. title: str The name displayed in the flowchart. namespace : str The namespace for the plug-ins of the subflowchart extension: None Not yet implemented logger : Logger = logger The logger to use and pass to parent classes Returns ------- None """ logger.debug(f"Creating Thermal Conductivity {self}") self.subflowchart = seamm.Flowchart( parent=self, name="Thermal Conductivity", namespace=namespace ) super().__init__( flowchart=flowchart, title="Thermal Conductivity", extension=extension, module=__name__, logger=logger, ) self._metadata = thermal_conductivity_step.metadata self.parameters = ThermalConductivityParameters() self.tensor_labels = [ ("xx", "red", "rgba(255,0,0,0.1)"), ("yy", "green", "rgba(0,255,0,0.1)"), ("zz", "blue", "rgba(0,0,255,0.1)"), ("xy", "rgb(127,127,0)", "rgba(127,127,0,0.1)"), ("xz", "rgb(255,0,255)", "rgba(255,0,255,0.1)"), ("yz", "rgb(0,255,255)", "rgba(0,255,255,0.1)"), ] self._file_handler = None # Various data self.J = None # The heat fluxes, nruns x 3 x n self.V = [] # The volumes of the runs self.T = [] # The temperatures of the runs self.timestep = [] # The timestep in fs for the run self.Ms = [] # Helfand moments per run self.M = None # The Helfand moments, 6 x m self.M_err = None # The stderr on HM, 6 x m self.Jcfs = [] # The ccf's of J, 6 x n, per run self.Jcf = None # The ccf's of J, 6 x n self.Jcf_err = None # The stderr of ccf's, 6 x n self.GK_integrals = [] # Cumulative integral of Jcf, per run self.GK_integral = None # Cumulative integral of Jcf self.GK_integral_err = None # Error of cumulative integral of Jcf # Set our citation level to 1! self.citation_level = 1 @property def version(self): """The semantic version of this module.""" return thermal_conductivity_step.__version__ @property def git_revision(self): """The git version of this module.""" return thermal_conductivity_step.__git_revision__
[docs] def create_parser(self): """Setup the command-line / config file parser""" parser_name = "thermal-conductivity-step" parser = seamm_util.getParser() self.subflowchart._parser = self.flowchart._parser # Remember if the parser exists ... this type of step may have been # found before parser_exists = parser.exists(parser_name) # Create the standard options, e.g. log-level super().create_parser(name=parser_name) if not parser_exists: # Any options for thermal conductivity itself parser.add_argument( parser_name, "--html", action="store_true", help="whether to write out html files for graphs, etc.", ) # Now need to walk through the steps in the subflowchart... self.subflowchart.reset_visited() node = self.subflowchart.get_node("1").next() while node is not None: node = node.create_parser() return self.next()
[docs] def description_text(self, P=None, short=False): """Create the text description of what this step will do. The dictionary of control values is passed in as P so that the code can test values, etc. Parameters ---------- P: dict An optional dictionary of the current values of the control parameters. Returns ------- str A description of the current step. """ if P is None: P = self.parameters.values_to_dict() text = ( f"Calculate the thermal conductivity at {P['T']} using " f"the {P['approach']} approach. {P['nruns']} runs will be averaged for " "the final results.\n\n" ) if not short: # The subflowchart self.subflowchart.root_directory = self.flowchart.root_directory # Get the first real node node = self.subflowchart.get_node("1").next() while node is not None: try: text += __(node.description_text()).__str__() except Exception as e: print( f"Error describing thermal_conductivity flowchart: {e} in " f"{node}" ) self.logger.critical( f"Error describing thermal_conductivity flowchart: {e} in " f"{node}" ) raise except: # noqa: E722 print( "Unexpected error describing thermal_conductivity flowchart: " f"{sys.exc_info()[0]} in {str(node)}" ) self.logger.critical( "Unexpected error describing thermal_conductivity flowchart: " f"{sys.exc_info()[0]} in {str(node)}" ) raise text += "\n" node = node.next() return self.header + "\n" + __(text, **P, indent=4 * " ").__str__()
[docs] def run(self): """Run a Thermal Conductivity step. Parameters ---------- None Returns ------- seamm.Node The next node object in the flowchart. """ next_node = super().run(printer) # Get the values of the parameters, dereferencing any variables P = self.parameters.current_values_to_dict( context=seamm.flowchart_variables._data ) # Print what we are doing printer.important(__(self.description_text(P, short=True), indent=self.indent)) # Find the handler for job.out and set the level up job_handler = None out_handler = None for handler in job.handlers: if ( isinstance(handler, logging.FileHandler) and "job.out" in handler.baseFilename ): job_handler = handler job_level = job_handler.level job_handler.setLevel(printing.JOB) elif isinstance(handler, logging.StreamHandler): out_handler = handler out_level = out_handler.level out_handler.setLevel(printing.JOB) # Get the first real node first_node = self.subflowchart.get_node("1").next() # Ensure the nodes have their options node = first_node while node is not None: node.all_options = self.all_options node = node.next() # And the subflowchart has the executor self.subflowchart.executor = self.flowchart.executor # Loop over the runs nruns = P["nruns"] fmt = f"0{len(str(nruns))}" for run in range(1, nruns + 1): # Direct most output to iteration.out run_dir = Path(self.directory) / f"run_{run:{fmt}}" run_dir.mkdir(parents=True, exist_ok=True) # A handler for the file if self._file_handler is not None: self._file_handler.close() job.removeHandler(self._file_handler) path = run_dir / "Run.out" path.unlink(missing_ok=True) self._file_handler = logging.FileHandler(path) self._file_handler.setLevel(printing.NORMAL) formatter = logging.Formatter(fmt="{message:s}", style="{") self._file_handler.setFormatter(formatter) job.addHandler(self._file_handler) # Add the run to the ids so the directory structure is reasonable self.flowchart.reset_visited() self.set_subids((*self._id, f"run_{run:{fmt}}")) # Run through the steps in the loop body node = first_node try: while node is not None: node = node.run() except DeprecationWarning as e: printer.normal("\nDeprecation warning: " + str(e)) traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stdout) except Exception as e: printer.job(f"Caught exception in run {run}: {str(e)}") with open(run_dir / "stderr.out", "a") as fd: traceback.print_exc(file=fd) if "continue" in P["errors"]: continue elif "exit" in P["errors"]: break else: raise else: self.process_run(run, run_dir) if True and len(self.V) > 1: if job_handler is not None: job_handler.setLevel(job_level) if out_handler is not None: out_handler.setLevel(out_level) self.analyze(P=P, style="1-line", run=run) if job_handler is not None: job_handler.setLevel(printing.JOB) if out_handler is not None: out_handler.setLevel(printing.JOB) self.logger.debug(f"End of run {run}") # Remove any redirection of printing. if self._file_handler is not None: self._file_handler.close() job.removeHandler(self._file_handler) self._file_handler = None if job_handler is not None: job_handler.setLevel(job_level) if out_handler is not None: out_handler.setLevel(out_level) # Analyze the results self.analyze(P=P, style="full") # Add other citations here or in the appropriate place in the code. # Add the bibtex to data/references.bib, and add a self.reference.cite # similar to the above to actually add the citation to the references. return next_node
[docs] def analyze(self, indent="", P=None, style="full", run=None, **kwargs): """Do any analysis of the output from this step. Also print important results to the local step.out file using "printer". Parameters ---------- indent: str An extra indentation for the output """ if style == "1-line": table = { "Run": [], "Kxx": [], "exx": [], "Kyy": [], "eyy": [], "Kzz": [], "ezz": [], "Kxy": [], "exy": [], "Kxz": [], "exz": [], "Kyz": [], "eyz": [], } elif style == "full": table = { "Method": [], "Dir": [], "Kappa": [], "±": [], "95%": [], "Units": [], } dt = Q_(self.timestep[0], "fs") ts = np.arange(self.M.shape[1]) * dt.m_as("ps") # Scale to ps ts = ts.tolist() # Get and plot the slopes of the Helfand moments slopes = [] xs = [] errs = [] fit0 = [] for i in range(6): slope, x, err = get_helfand_slope(self.M[i], ts, sigma=self.M_err[i]) slopes.append(slope) xs.append(x) errs.append(err) if i < 3: ( kappa, kappa_err, a, a_err, tau, tau_err, tf, yf, ) = fit_green_kubo_integral(slope, x, sigma=err) fit0.append( { "kappa": kappa, "stderr": kappa_err, "xs": tf, "ys": yf, "a": a, "a_err": a_err, "tau": tau, "tau_err": tau_err, } ) v, e = fmt_err(kappa, 2 * kappa_err) if style == "1-line": if i == 0: table["Run"].append(len(self.V)) alpha = self.tensor_labels[i][0] table["K" + alpha].append(v) table["e" + alpha].append(e) elif style == "full": table["Method"].append("Helfand Derivative" if i == 0 else "") table["Dir"].append(self.tensor_labels[i][0]) table["Kappa"].append(v) table["±"].append("±") table["95%"].append(e) table["Units"].append("W/m/K" if i == 0 else "") else: if style == "1-line": # blank the off-diagonals alpha = self.tensor_labels[i][0] table["K" + alpha].append("") table["e" + alpha].append("") figure = self.create_figure( module_path=("seamm",), template="line.graph_template", title="Helfand Derivatives", ) plot_helfand_slopes( figure, slopes, xs[0], err=errs, fit=fit0, labels=self.tensor_labels ) figure.grid_plots("Slope") # Write to disk filename = "HelfandDerivatives.graph" path = Path(self.directory) / filename figure.dump(path) if "html" in self.options and self.options["html"]: path = path.with_suffix(".html") figure.template = "line.html_template" figure.dump(path) # Fit the slopes fit = [] for i in range(6): if i < len(fit0): start = max(fit0[i]["tau"]) * 3 else: start = 1 if start < 1: start = 1 if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("\n\n\n**********************\n") self.logger.debug(f"{i=}") for v1, v2, v3 in zip(ts[0:9], self.M[i][0:9], self.M_err[i][0:9]): self.logger.debug(f" {v1:.3f} {v2:12.4e} {v3:12.4e}") self.logger.debug("...") for v1, v2, v3 in zip( ts[-9:-1], self.M[i][-9:-1], self.M_err[i][-9:-1] ): self.logger.debug(f" {v1:.3f} {v2:12.4e} {v3:12.4e}") self.logger.debug(f"{start=}") self.logger.debug("--------") self.logger.debug("") try: slope, err, xs, ys = fit_helfand_moment( self.M[i], ts, sigma=self.M_err[i], start=start, logger=self.logger ) fit.append( { "kappa": slope, "stderr": err, "xs": xs, "ys": ys, } ) v, e = fmt_err(slope, 2 * err) if style == "1-line": alpha = self.tensor_labels[i][0] table["K" + alpha].append(v) table["e" + alpha].append(e) elif style == "full": table["Method"].append("Helfand Moments" if i == 0 else "") table["Dir"].append(self.tensor_labels[i][0]) table["Kappa"].append(v) table["±"].append("±") table["95%"].append(e) table["Units"].append("W/m/K" if i == 0 else "") except Exception as e: logger.warning("The fit of the Helfand moments failed. Continuing...") logger.warning(e) self.plot_helfand_moment(self.M, ts, M_err=self.M_err, fit=fit) # Now for the Green-Kubo method dt = Q_(self.timestep[0], "fs") # Plot the correlation functions ts = (np.arange(self.Jcf.shape[1]) * dt.m_as("ps")).tolist() end = len(ts) // 4 self.plot_JJ_correlation(self.Jcf[:, :end], ts[:end], err=self.Jcf_err[:, :end]) # Fit the integrals to 1 - e(-tau/t0) ts = np.arange(self.GK_integral.shape[1]) * dt.m_as("ps") ts = ts.tolist() fit = [] for i in range(3): kappa, kappa_err, a, a_err, tau, tau_err, tf, yf = fit_green_kubo_integral( self.GK_integral[i], ts, sigma=self.GK_integral_err[i] ) fit.append( { "kappa": kappa, "stderr": kappa_err, "xs": tf, "ys": yf, "a": a, "a_err": a_err, "tau": tau, "tau_err": tau_err, } ) v, e = fmt_err(kappa, 2 * kappa_err) if style == "1-line": if i == 0: table["Run"].append("") alpha = self.tensor_labels[i][0] table["K" + alpha].append(v) table["e" + alpha].append(e) elif style == "full": table["Method"].append("Green-Kubo" if i == 0 else "") table["Dir"].append(self.tensor_labels[i][0]) table["Kappa"].append(v) table["±"].append("±") table["95%"].append(e) table["Units"].append("W/m/K" if i == 0 else "") for i in range(3, 6): alpha = self.tensor_labels[i][0] if style == "1-line": # blank the off-diagonals table["K" + alpha].append("") table["e" + alpha].append("") # Plot the Green-Kubo integrals self.plot_GK_integral(self.GK_integral, ts, err=self.GK_integral_err, fit=fit) # Print the table of results if style == "1-line": text = "" tmp = tabulate( table, headers="keys", tablefmt="simple_outline", disable_numparse=True, ) if len(self.V) == 2: length = len(tmp.splitlines()[0]) text += "\n" text += "Thermal Conductivity".center(length) text += "\n" text += "--------------------".center(length) text += "\n" text += "First line is the fit to Helfand derivative".center(length) text += "\n" text += "Second line is the slope of the Helfand moments".center(length) text += "\n" text += "Third line is the fit to Green-Kubo integral".center(length) text += "\n" text += "\n".join(tmp.splitlines()[0:-1]) else: if run is not None and run == P["nruns"]: text += "\n".join(tmp.splitlines()[-4:]) else: text += "\n".join(tmp.splitlines()[-4:-1]) printer.normal(__(text, indent=8 * " ", wrap=False, dedent=False)) else: text = "" tmp = tabulate( table, headers="keys", tablefmt="simple_outline", disable_numparse=True, colalign=( "center", "center", "decimal", "center", "decimal", "left", ), ) length = len(tmp.splitlines()[0]) text += "\n" text += "Thermal Conductivity".center(length) text += "\n" text += tmp text += "\n" printer.normal(__(text, indent=8 * " ", wrap=False, dedent=False))
[docs] def plot_GK_integral(self, x, ts, err=None, fit=None, filename="GK_integral.graph"): """Create a plot of the GK integrals and any fit to them. Parameters ---------- x : numpy.ndarray(6, m) The cumulative integrals ts : [float] List of times for the points, in ps err : numpy.ndarray(6, m) The standard errors of the integrals (optional) fit : The fit parameters for the integrals (optional) filename : str The filename to write """ figure = self.create_figure( module_path=("seamm",), template="line.graph_template", title="Green-Kubo Integrals", ) max_tau = 0 for i in range(3): tau = fit[i]["tau"][-1] if tau > max_tau: max_tau = tau if max_tau * 4 > ts[-1]: rng = None else: rng = [0, max_tau * 4] plot_GK_integrals( figure, x, ts, err=err, fit=fit, labels=self.tensor_labels, _range=rng ) figure.grid_plots("GKI") # Write to disk path = Path(self.directory) / filename figure.dump(path) if "html" in self.options and self.options["html"]: path = path.with_suffix(".html") figure.template = "line.html_template" figure.dump(path)
[docs] def plot_helfand_moment( self, M, ts, M_err=None, fit=None, filename="HelfandMoments.graph" ): """Create a plot of the Helfand moments and any fit to them. Parameters ---------- M : numpy.ndarray(6, m) The Helfand moments. ts : [float] List of times for the points, in ps """ figure = self.create_figure( module_path=("seamm",), template="line.graph_template", title="Helfand Moments", ) plot_helfand_moments( figure, M, ts, err=M_err, fit=fit, labels=self.tensor_labels ) figure.grid_plots("M") # Write to disk path = Path(self.directory) / filename figure.dump(path) if "html" in self.options and self.options["html"]: path = path.with_suffix(".html") figure.template = "line.html_template" figure.dump(path)
[docs] def plot_JJ_correlation(self, CCF, ts, err=None, fit=None, filename="JJ.graph"): """Create a plot of the orrelation functions and any fit to them. Parameters ---------- CCF : numpy.ndarray(6, m) The cross-correlation functions. ts : [float] List of times for the points, in ps err : numpy.ndarray(6, m) The standard errors of the CCF (optional) fit : The fit parameters for any fit of the CFF (optional) filename : str The filename to write """ figure = self.create_figure( module_path=("seamm",), template="line.graph_template", title="Heat Flux Correlation Functions", ) plot_correlation_functions( figure, CCF, ts, err=err, fit=fit, labels=self.tensor_labels ) figure.grid_plots("CCF") # Write to disk path = Path(self.directory) / filename figure.dump(path) if "html" in self.options and self.options["html"]: path = path.with_suffix(".html") figure.template = "line.html_template" figure.dump(path)
[docs] def process_run(self, run, run_dir): """Get the heat fluxes from the run and do initial processing. Parameters ---------- run : int The run number run_dir : pathlib.Path The toplevel directory of the run. """ paths = sorted(run_dir.glob("**/heat_flux.trj")) if len(paths) == 0: raise RuntimeError( f"There is no heat flux data for run {run} in {run_dir}." ) elif len(paths) > 1: raise NotImplementedError( f"Cannot handle multiple HeatFlux files from run {run} in {run_dir}." ) # Process the trajectory data control_properties = lambda x: x not in ["tstep"] # noqa: E731 with paths[0].open() as fd: # Get the initial header line and check line = fd.readline() tmp = line.split() if len(tmp) < 3: raise RuntimeError(f"Bad header for {paths[0]}: {line}") if tmp[0] != "!MolSSI": raise RuntimeError(f"Not a MolSSI file? {paths[0]}: {line}") if tmp[1] != "trajectory": raise RuntimeError(f"Not a trajectory file? {paths[0]}: {line}") if tmp[2][0] != "2": raise RuntimeError( f"Can only handle version 2 trajectory files. {paths[0]}: {line}" ) metadata = json.loads(" ".join(tmp[3:])) data = pandas.read_csv( fd, sep=" ", header=0, comment="!", usecols=control_properties, index_col=None, ) dt = Q_(metadata["dt"], metadata["tunits"]) data = data.reset_index(drop=True) data.index *= dt.m_as("fs") Jx = data["Jx"].to_numpy() Jy = data["Jy"].to_numpy() Jz = data["Jz"].to_numpy() J = np.stack((Jx, Jy, Jz)) # Get the state function V & T path = paths[0].parent / "state.json" if path.exists(): with path.open() as fd: state = json.load(fd) T = state["T"]["mean"] if "V" in state: V = state["V"]["mean"] else: _, configuration = self.get_system_configuration() V = configuration.volume self.V.append(V) self.T.append(T) else: self.V.append(None) self.T.append(None) self.timestep.append(dt.magnitude) # We need properties like the temperature and volume. T = Q_(T, "K") V = Q_(V, "Å^3") k_B = Q_("k_B") Jsq = Q_("W^2/m^4") # Limit the lengths of the data n = J.shape[1] m = min(n // 10, 10000) # Create the Helfand moments constants = Jsq * V * dt**2 / (2 * k_B * T**2) M = create_helfand_moments(J, m=m) * constants.m_as("W/m/K*ps") self.Ms.append(M) # And correlation functions for Green-Kubo method constants = V / (k_B * T**2) * Jsq * dt Jcf, integral = create_correlation_functions(J, m=m) self.Jcfs.append(Jcf) self.GK_integrals.append(integral * constants.m_as("W/m/K")) # Merge the results and get the averages and errors tmp = np.stack(self.Ms) self.M = np.average(tmp, axis=0) self.M_err = np.std(tmp, axis=0) tmp = np.stack(self.Jcfs) self.Jcf = np.average(tmp, axis=0) self.Jcf_err = np.std(tmp, axis=0) tmp = np.stack(self.GK_integrals) self.GK_integral = np.average(tmp, axis=0) self.GK_integral_err = np.std(tmp, axis=0)
[docs] def set_id(self, node_id=()): """Sequentially number the subnodes""" self.logger.debug("Setting ids for subflowchart {}".format(self)) if self.visited: return None else: self.visited = True self._id = node_id self.set_subids(self._id) return self.next()
[docs] def set_subids(self, node_id=()): """Set the ids of the nodes in the subflowchart""" node = self.subflowchart.get_node("1").next() n = 1 while node is not None: node = node.set_id((*node_id, str(n))) n += 1