Source code for lammps_step.lammps

# -*- coding: utf-8 -*-

"""A node or step for LAMMPS in a flowchart"""

import configparser
from contextlib import contextmanager
import copy
import importlib
import json
import logging
from math import sqrt, exp, degrees, radians, cos, acos
from pathlib import Path
import os
import os.path
import pkg_resources
import pprint
import shutil
import string
import sys
import traceback
import warnings

import bibtexparser
import numpy as np
import pandas
from scipy import stats
import statsmodels.tsa.stattools as stattools

import lammps_step
import molsystem
import seamm
import seamm_exec
from seamm_ff_util import tabulate_angle
import seamm_util
import seamm_util.printing as printing
from seamm_util import CompactJSONEncoder, Configuration
from seamm_util.printing import FormattedText as __

# from pymbar import timeseries

logger = logging.getLogger("lammps")
job = printing.getPrinter()
printer = printing.getPrinter("lammps")


# Temporarily used here to stop pymbar's annoying warning.
[docs] @contextmanager def logging_disabled(highest_level=logging.CRITICAL): """ A context manager that will prevent any logging messages triggered during the body from being processed. :param highest_level: the maximum logging level in use. This would only need to be changed if a custom level greater than CRITICAL is defined. From Simon Weber https://gist.github.com/simon-weber/7853144 """ # two kind-of hacks here: # * can't get the highest logging level in effect => delegate to the user # * can't get the current module-level override => use an undocumented # (but non-private!) interface previous_level = logging.root.manager.disable logging.disable(highest_level) try: yield finally: logging.disable(previous_level)
with logging_disabled(highest_level=logging.WARNING): from pymbar import timeseries # Add LAMMPS's properties to the standard properties path = Path(pkg_resources.resource_filename(__name__, "data/")) csv_file = path / "properties.csv" molsystem.add_properties_from_file(csv_file) bond_style = { "quadratic_bond": "harmonic", "quartic_bond": "class2", "fene": "fene", "morse": "morse", } angle_style = { "quadratic_angle": "harmonic", "quartic_angle": "class2", "simple_fourier_angle": "fourier/simple", "tabulated_angle": "table", } dihedral_style = { "torsion_1": "harmonic", "torsion_3": "class2", "torsion_opls": "opls", } improper_style = { "wilson_out_of_plane": "class2", "improper_opls": "cvff", }
[docs] class LAMMPS(seamm.Node): display_units = { "T": "K", "P": "atm", "t": "fs", "V": "Å^3", "density": "g/mL", "a": "Å", "b": "Å", "c": "Å", "Etot": "kcal/mol", "Eke": "kcal/mol", "Epe": "kcal/mol", "Emol": "kcal/mol", "Epair": "kcal/mol", "Jx": "W/m^2", "Jy": "W/m^2", "Jz": "W/m^2", "Kappa_x": "W/K/m", "Kappa_y": "W/K/m", "Kappa_z": "W/K/m", "Kappa": "W/K/m", "u": "kcal/mol", } display_title = { "T": "Temperature", "P": "Pressure", "t": "Time", "V": "Volume", "density": "Density", "a": "a lattice parameter", "b": "b lattice parameter", "c": "c lattice parameter", "Etot": "Total Energy", "Eke": "Kinetic Energy", "Epe": "Potential Energy", "Emol": "Molecular Energy, Valence Terms", "Epair": "Pair (Nonbond) Energy", "Jx": "heat flux in x", "Jy": "heat flux in y", "Jz": "heat flux in z", "Kappa_x": "thermal conductivity in x", "Kappa_y": "thermal conductivity in y", "Kappa_z": "thermal conductivity in z", "Kappa": "thermal conductivity", "u": "atom PE", } def __init__( self, flowchart=None, namespace="org.molssi.seamm.lammps", extension=None ): """Setup the main LAMMPS step Keyword arguments: """ logger.debug("Creating LAMMPS {}".format(self)) # The subflowchart self.subflowchart = seamm.Flowchart( parent=self, name="LAMMPS", namespace=namespace ) self._initialization_node = None self._trajectory = [] self._data = {} self._results = {} # Sotrage for computational and timing results super().__init__( flowchart=flowchart, title="LAMMPS", extension=extension, logger=logger ) @property def version(self): """The semantic version of this module.""" return lammps_step.__version__ @property def git_revision(self): """The git version of this module.""" return lammps_step.__git_revision__ @property def results(self): """The storage for results.""" return self._results
[docs] @staticmethod def box_to_cell(lx, ly, lz, xy, xz, yz): """Convert the LAMMPS box definition to cell parameters.""" if xy == 0 and xz == 0 and yz == 0: a = lx b = ly c = lz alpha = 0.0 beta = 0.0 gamma = 0.0 else: a = lx b = sqrt(ly**2 + xy**2) c = sqrt(lz**2 + xz**2 + yz**2) alpha = degrees(acos((xy * xz + lx * yz) / (b * c))) beta = degrees(acos(xz / c)) gamma = degrees(acos(xy / b)) return (a, b, c, alpha, beta, gamma)
[docs] @staticmethod def cell_to_box(a, b, c, alpha, beta, gamma): """Convert cell parameters to the LAMMPS box.""" if alpha == 90 and beta == 90 and gamma == 90: lx = a ly = b lz = c xy = xz = yz = 0.0 else: lx = a xy = b * cos(radians(gamma)) xz = c * cos(radians(beta)) ly = sqrt(b**2 - xy**2) yz = (b * c * cos(radians(alpha)) - xy * xz) / ly lz = sqrt(c**2 - xz**2 - yz**2) return (lx, ly, lz, xy, xz, yz)
[docs] def create_parser(self): """Setup the command-line / config file parser""" parser_name = self.step_type parser = seamm_util.getParser() # Remember if the parser exists ... this type of step may have been # found before parser_exists = parser.exists(parser_name) # Create the standard options, e.g. log-level result = super().create_parser(name=parser_name) if parser_exists: return result # LAMMPS specific options parser.add_argument( parser_name, "--ncores", default="available", help=( "The maximum number of cores to use for LAMMPS. " "Default: all available cores." ), ) parser.add_argument( parser_name, "--atoms-per-core", type=int, default="1000", help="the optimal number of atoms per core for LAMMPS", ) parser.add_argument( parser_name, "--html", action="store_true", help="whether to write out html files for graphs, etc.", ) if False: parser.add_argument( parser_name, "--modules", nargs="*", default=None, help="the environment modules to load for LAMMPS", ) parser.add_argument( parser_name, "--gpu-modules", nargs="*", default=None, help="the environment modules to load for the GPU version of LAMMPS", ) parser.add_argument( parser_name, "--lammps-path", default=None, help="the path to the LAMMPS executables", ) parser.add_argument( parser_name, "--lammps-serial", default="lmp_serial", help="the serial version of LAMMPS", ) parser.add_argument( parser_name, "--lammps-mpi", default="lmp_mpi", help="the mpi version of LAMMPS", ) parser.add_argument( parser_name, "--cmd-args", default="", help="the command-line arguments for LAMMPS, e.g. '-k on'", ) parser.add_argument( parser_name, "--gpu-cmd-args", default="", help=( "the command-line arguments for GPU version of LAMMPS, e.g. '-k on'" ), ) parser.add_argument( parser_name, "--mpiexec", default="mpiexec", help="the mpi executable" ) return result
[docs] def set_id(self, node_id): """Set the id for node to a given tuple""" self._id = node_id # and set our subnodes self.subflowchart.set_ids(self._id) return self.next()
[docs] def description_text(self, P=None): """Return a short description of this step. Return a nicely formatted string describing what this step will do. Keyword arguments: P: a dictionary of parameter values, which may be variables or final values. If None, then the parameters values will be used as is. """ self.subflowchart.root_directory = self.flowchart.root_directory # Get the first real node node = self.subflowchart.get_node("1").next() text = self.header + "\n\n" while node is not None: try: text += __(node.description_text(), indent=3 * " ").__str__() except Exception as e: print( "Error describing LAMMPS flowchart: {} in {}".format( str(e), str(node) ) ) self.logger.critical( "Error describing LAMMPS flowchart: {} in {}".format( str(e), str(node) ) ) raise except: # noqa: E722 print( "Unexpected error describing LAMMPS flowchart: {} in {}".format( sys.exc_info()[0], str(node) ) ) self.logger.critical( "Unexpected error describing LAMMPS flowchart: {} in {}".format( sys.exc_info()[0], str(node) ) ) raise text += "\n" node = node.next() return text
[docs] def run(self): """Run a LAMMPS simulation""" system_db = self.get_variable("_system_db") configuration = system_db.system.configuration n_atoms = configuration.n_atoms if n_atoms == 0: self.logger.error("LAMMPS run(): there is no structure!") raise RuntimeError("LAMMPS run(): there is no structure!") # Initialize storage self._results = {} next_node = super().run(printer) # Get the options o = self.options global_options = self.global_options # Whether to run parallel and if so, how many mpi processes if global_options["parallelism"] in ("any", "mpi"): np = n_atoms // o["atoms_per_core"] + 1 if o["ncores"] != "available": np = min(np, int(o["ncores"])) if global_options["ncores"] != "available": np = min(np, int(global_options["ncores"])) else: np = 1 # Print headers and get to work printer.important(self.header) self.subflowchart.root_directory = self.flowchart.root_directory files = {} # Get the first real node node = self.subflowchart.get_node("1").next() extras = {} history_nodes = [] # Create overall directory for the lammps step os.makedirs(self.directory, exist_ok=True) files = {} while node is not None: if isinstance(node, lammps_step.Initialization): initialization_header, eex = self._get_node_input( node=node, extras={"read_data": True} ) ( structure_data, pair_table, bond_table, angle_table, dihedral_table, ) = self.structure_data(eex) files["structure.dat"] = structure_data if bond_table != "": files["tabulated_bonds.dat"] = bond_table if angle_table != "": files["tabulated_angles.dat"] = angle_table if dihedral_table != "": files["tabulated_dihedrals.dat"] = dihedral_table self.logger.debug("structure.dat:\n" + files["structure.dat"]) self._initialization_node = node files["input.dat"] = copy.deepcopy(initialization_header) # Find the bond & angle types as needed for shake/rattle P = node.parameters.current_values_to_dict( context=seamm.flowchart_variables._data ) shake = self.shake_fix(P, eex) if shake != "": extras["shake"] = shake else: P = node.parameters.current_values_to_dict( context=seamm.flowchart_variables._data ) if "run_control" not in P or "Until" not in P["run_control"]: history_nodes.append(node) else: if len(history_nodes) > 0: # if imcccc files = self._prepare_input( files, nodes=history_nodes, read_restart=False, write_restart=True, extras=extras, ) files = self._execute_single_sim(files, np=np) self.analyze(nodes=history_nodes) self._trajectory = [] iteration = 0 extras["nsteps"] = 666 while True: extras["nsteps"] = round(1.5 * extras["nsteps"]) control_properties = {} for prp in P["control_properties"]: k = prp[0] control_properties[k] = { "accuracy": float(prp[1][0].strip("%")), "units": prp[1][1], "enough_accuracy": False, } P = node.parameters.current_values_to_dict( context=seamm.flowchart_variables._data ) files = self._prepare_input( files, nodes=node, iteration=iteration, read_restart=True, write_restart=True, extras=extras, ) files = self._execute_single_sim(files, np=np) # Analyze the results analysis = self.analyze(nodes=node) node_id = node._id[1] for prp, v in analysis[node_id].items(): if v["short_production"] is False: if v["few_neff"] is False: accuracy = control_properties[prp]["accuracy"] / 100 dof = v["n_sample"] mean = v["mean"] ci = stats.t.interval(0.95, dof - 1, loc=0, scale=1) interval = (ci[1] - ci[0]) * v["sem"] if abs(interval / mean) < accuracy: control_properties[prp][ "enough_accuracy" ] = True enough_acc = [ v["enough_accuracy"] for prp, v in control_properties.items() ] if all(enough_acc) is True: history_nodes = [] initialization_header, eex = self._get_node_input( node=self._initialization_node, extras={"read_data": False}, ) files["input.dat"] = copy.deepcopy(initialization_header) files["input.dat"].append( "read_restart %s" % (os.path.join(files["restart"]["filename"])) ) self._trajectory = [] break iteration = iteration + 1 node = node.next() if len(history_nodes) == 0: node_initialization = self.subflowchart.get_node("1").next() if node_initialization is None: raise TypeError( "The initial node in a LAMMPS workflow should be an ", "initialization node", ) if isinstance(node_initialization, lammps_step.Initialization) is False: raise TypeError( "The initial node in a LAMMPS workflow should be an ", "initialization node", ) # base = "lammps_substep_%s_iter_%d" % (node_initialization._id[1], 0) base = "lammps" restart = base + ".restart.*" dump = base + ".dump" new_input_data = [] new_input_data.append("run 0") new_input_data.append(f"write_restart {restart}") if configuration.periodicity == 0: new_input_data.append( f"write_dump all custom {dump} id xu yu zu vx vy vz" " modify flush yes sort id" ) else: new_input_data.append( f"write_dump all custom {dump} id xsu ysu zsu vx vy vz" " modify flush yes sort id" ) new_input_data.append("") new_input_data.append("info computes fixes dumps log out") files["input.dat"] += "\n".join(new_input_data) self.logger.debug("input.dat:\n" + files["input"]["data"]) files = self._execute_single_sim(files, np=np) if len(history_nodes) > 0: files = self._prepare_input( files, nodes=history_nodes, read_restart=False, write_restart=True, extras=extras, ) files = self._execute_single_sim(files, np=np) self.analyze(nodes=history_nodes) self._trajectory = [] dump_file = Path(self.directory) / "final.dump" if dump_file.exists: try: self.read_dump(dump_file) except Exception as e: printer.normal("Warning: unable to read the LAMMPS dumpfile") logger.warning(f"The was a problem reading the LAMMPS dumpfile: {e}") logger.warning(traceback.format_exc()) else: printer.normal("Warning: there is no 'dump' file from LAMMPS") printer.normal("") return next_node
def _execute_single_sim(self, files, np=1, return_files=None): """ Step #1: Execute input file """ return_files = [ "summary_*.txt", "trajectory_*.seamm_trj", "*.trj", "*.restart.*", "*.dump", "*.dump.*", "*.log", "*.dat", "log.lammps", "log.cite", "run_lammps", ] # Check for already having run path = Path(self.directory) / "success.dat" if path.exists(): result = {} path = Path(self.directory) / "log.cite" if path.exists(): result["log.cite"] = { "data": path.read_text(), } path = Path(self.directory) / "stdout.txt" if path.exists(): result["stdout"] = path.read_text() result["stderr"] = "" else: # Set up the computational limits and get the computational enviroment cl = {"NTASKS": np} ce = seamm_exec.computational_environment(cl) executor = self.flowchart.executor # Read configuration file for LAMMPS if it exists executor_type = executor.name full_config = configparser.ConfigParser() ini_dir = Path(self.global_options["root"]).expanduser() path = ini_dir / "lammps.ini" # If the config file doesn't exists, get the default if not path.exists(): resources = importlib.resources.files("lammps_step") / "data" ini_text = (resources / "lammps.ini").read_text() txt_config = Configuration(path) txt_config.from_string(ini_text) # Work out the conda info needed txt_config.set_value("local", "conda", os.environ["CONDA_EXE"]) txt_config.set_value("local", "conda-environment", "seamm-lammps") txt_config.save() printer.normal(f"Wrote the LAMMPS configuration file to {path}") printer.normal("") full_config.read(ini_dir / "lammps.ini") if executor_type not in full_config: path = shutil.which("lmp") mpi_path = shutil.which("mpirun") if path is None or mpi_path is None: raise RuntimeError( f"No section for '{executor_type}' in LAMMPS ini file " f"({ini_dir / 'lammps.ini'}), nor in the defaults, nor " "in the path!" ) else: txt_config = Configuration(path) txt_config.add_section(executor_type) txt_config.set_value(executor_type, "installation", "local") txt_config.set_value( executor_type, "code", f"{mpi_path} -np {{NTASKS}} {path}", ) txt_config.set_value( executor_type, "python", f"mpirun -np {{NTASKS}} {shutil.which('python')}", ) txt_config.save() printer.normal(f"Wrote the LAMMPS configuration file to {path}") printer.normal("") full_config.read(ini_dir / "lammps.ini") full_config.set(executor_type, "code", str(path)) config = dict(full_config.items(executor_type)) # Use the matching version of the seamm-mopac image by default. config["version"] = self.version executor_type = executor.name if executor_type not in full_config: raise RuntimeError( f"No section for '{executor_type}' in LAMMPS ini file " f"({ini_dir / 'lammps.ini'})" ) config = dict(full_config.items(executor_type)) # Setup the command lines cmd = [] if "run_lammps" in files: cmd = ["{python}", "run_lammps"] if ( "GPUS" not in ce and "cmd_args" in config and config["cmd_args"] != "" ): cmd.extend(["--cmd-args", config["cmd_args"]]) if ( "GPUS" in ce and "gpu_cmd_args" in config and config["gpu_cmd_args"] != "" ): cmd.extend(["--cmd-args", config["gpu_cmd_args"]]) else: cmd = ["{code}"] if ( "GPUS" not in ce and "cmd_args" in config and config["cmd_args"] != "" ): cmd.extend(config["cmd_args"].split()) if ( "GPUS" in ce and "gpu_cmd_args" in config and config["gpu_cmd_args"] != "" ): cmd.extend(config["gpu_cmd_args"].split()) cmd.extend(["-in", "input.dat"]) if "NGPUS" in ce: printer.important( f" LAMMPS running with {np} processes and {ce['NGPUS']} gpus." ) else: printer.important(f" LAMMPS using MPI with {np} processes.") printer.important("") cmd.extend([">", "stdout.txt", "2>", "stderr.txt"]) result = executor.run( cmd=cmd, config=config, directory=self.directory, files=files, return_files=return_files, in_situ=True, shell=True, ce=ce, ) if not result: self.logger.error("There was an error running LAMMPS") return None self.logger.debug("\n" + pprint.pformat(result)) f = os.path.join(self.directory, "stdout.txt") with open(f, mode="w") as fd: fd.write(result["stdout"]) # Add the citations, getting the version from stdout and any citations if "log.cite" in result: self._add_lammps_citations( result["stdout"], cite=result["log.cite"]["data"] ) else: self._add_lammps_citations(result["stdout"]) initialization_header, eex = self._get_node_input( node=self._initialization_node, extras={"read_data": False} ) files["input.dat"] = copy.deepcopy(initialization_header) # Write a small file to say that LAMMPS ran successfully, so cancel # skip if rerunning. path = Path(self.directory) / "success.dat" path.write_text("success") return files def _prepare_input( self, files, nodes=None, iteration=0, read_restart=False, write_restart=False, extras=None, ): _, configuration = self.get_system_configuration() python_script = None postscript = None if isinstance(nodes, list) is False: node_ids = [nodes._id[1]] todo = self._get_node_input(node=nodes, extras=extras) new_input_data = todo["script"] if todo["postscript"] is not None: postscript = todo["postscript"] if todo["use python"] and "python script" in todo: python_script = todo["python script"] else: node_ids = [] new_input_data = [] for n in nodes: node_ids.append(n._id[1]) todo = self._get_node_input(node=n, extras=extras) new_input_data += todo["script"] if todo["postscript"] is not None: postscript = todo["postscript"] if todo["use python"] and "python script" in todo: python_script = todo["python script"] # base = "lammps_substep_%s_iter_%d" % ("_".join(node_ids), iteration) # restart = base + ".restart.*" # if read_restart: # new_input_data.insert( # 0, "read_restart %s" % (files["restart"]["filename"]) # ) # if write_restart: # new_input_data.append(f"write_restart {restart}") if postscript is None: if configuration.periodicity == 0: new_input_data.append( "write_dump all custom final.dump id xu yu zu vx vy vz" " modify flush yes sort id" ) else: new_input_data.append( "write_dump all custom final.dump id xsu ysu zsu vx vy vz" " modify flush yes sort id" ) new_input_data.append("") new_input_data.append("") new_input_data.append("info computes fixes dumps out log") files["input.dat"] += new_input_data files["input.dat"] = "\n".join(files["input.dat"]) self.logger.debug("input.dat:\n" + files["input.dat"]) if postscript is not None: if configuration.periodicity == 0: postscript.append( "write_dump all custom final.dump id xu yu zu vx vy vz" " modify flush yes sort id" ) else: postscript.append( "write_dump all custom final.dump id xsu ysu zsu vx vy vz" " modify flush yes sort id" ) postscript.append("") postscript.append("info computes fixes dumps out log") files["input_post.dat"] = "\n".join(postscript) if python_script is not None: files["run_lammps"] = python_script return files def _get_node_input(self, node=None, extras=None): try: ret = node.get_input(extras=extras) except Exception as e: print("Error running LAMMPS flowchart: {} in {}".format(str(e), str(node))) self.logger.critical( "Error running LAMMPS flowchart: {} in {}".format(str(e), str(node)) ) raise except: # noqa: E722 print( "Unexpected error running LAMMPS flowchart: {} in {}".format( sys.exc_info()[0], str(node) ) ) self.logger.critical( "Unexpected error running LAMMPS flowchart: {} in {}".format( sys.exc_info()[0], str(node) ) ) raise return ret
[docs] def structure_data(self, eex, triclinic=False): """Create the LAMMPS structure file from the energy expression""" lines = [] pair_table = [] bond_table = [] angle_table = [] dihedral_table = [] lines.append("Structure file for LAMMPS generated by a MolSSI flowchart") lines.append("{:10d} atoms".format(eex["n_atoms"])) lines.append("{:10d} atom types".format(eex["n_atom_types"])) if "n_bonds" in eex and eex["n_bonds"] > 0: lines.append("{:10d} bonds".format(eex["n_bonds"])) lines.append("{:10d} bond types".format(eex["n_bond_types"])) if "n_angles" in eex and eex["n_angles"] > 0: lines.append("{:10d} angles".format(eex["n_angles"])) lines.append("{:10d} angle types".format(eex["n_angle_types"])) if "n_torsions" in eex and eex["n_torsions"] > 0: lines.append("{:10d} dihedrals".format(eex["n_torsions"])) lines.append("{:10d} dihedral types".format(eex["n_torsion_types"])) if "n_oops" in eex and eex["n_oops"] > 0: lines.append("{:10d} impropers".format(eex["n_oops"])) lines.append("{:10d} improper types".format(eex["n_oop_types"])) # Find the box limits periodicity = eex["periodicity"] if periodicity == 3: a, b, c, alpha, beta, gamma = eex["cell"] lx, ly, lz, xy, xz, yz = LAMMPS.cell_to_box(a, b, c, alpha, beta, gamma) lines.append("{} {} xlo xhi".format(0.0, lx)) lines.append("{} {} ylo yhi".format(0.0, ly)) lines.append("{} {} zlo zhi".format(0.0, lz)) xy = xy if abs(xy) > 1.0e-06 else 0.0 xz = xz if abs(xy) > 1.0e-06 else 0.0 yz = yz if abs(xy) > 1.0e-06 else 0.0 if triclinic or xy > 0.0 or xz > 0.0 or yz > 0.0: lines.append("{} {} {} xy xz yz".format(xy, xz, yz)) else: x, y, z, index = eex["atoms"][0] xlo = xhi = x ylo = yhi = y zlo = zhi = z for x, y, z, index in eex["atoms"]: xlo = x if x < xlo else xlo xhi = x if x > xhi else xhi ylo = y if y < ylo else ylo yhi = y if y > yhi else yhi zlo = z if z < zlo else zlo zhi = z if z > zhi else zhi # Some extra space.... xlo -= 10.0 xhi += 10.0 ylo -= 10.0 yhi += 10.0 zlo -= 10.0 zhi += 10.0 lines.append("{} {} xlo xhi".format(xlo, xhi)) lines.append("{} {} ylo yhi".format(ylo, yhi)) lines.append("{} {} zlo zhi".format(zlo, zhi)) # the atoms and their masses, etc. lines.append("") lines.append("Atoms") lines.append("") if "charges" in eex: if "molecule" in eex: for i, mol, xyz_index, q in zip( range(1, eex["n_atoms"] + 1), eex["molecule"], eex["atoms"], eex["charges"], ): x, y, z, index = xyz_index lines.append( f"{i:6d} {mol + 1:6d} {index:6d} {q:6.3f} {x:12.7f} {y:12.7f} " f"{z:12.7f}" ) else: for i, xyz_index, q in zip( range(1, eex["n_atoms"] + 1), eex["atoms"], eex["charges"] ): x, y, z, index = xyz_index lines.append( f"{i:6d} {index:6d} {q:6.3f} {x:12.7f} {y:12.7f} {z:12.7f}" ) else: for i, xyz_index in enumerate(eex["atoms"]): x, y, z, index = xyz_index lines.append(f"{i+1:6d} {index:6d} {x:12.7f} {y:12.7f} {z:12.7f}") _, configuration = self.get_system_configuration() if configuration.atoms.have_velocities: lines.append("") lines.append("Velocities") lines.append("") for i, vxyz in enumerate( configuration.atoms.get_velocities(fractionals=False), start=1 ): vx, vy, vz = vxyz lines.append(f"{i:6d} {vx:12.7f} {vy:12.7f} {vz:12.7f}") lines.append("") lines.append("Masses") lines.append("") self._data["masses"] = [] for i, parameters in zip(range(1, eex["n_atom_types"] + 1), eex["masses"]): mass, itype = parameters lines.append("{:6d} {} # {}".format(i, mass, itype)) self._data["masses"].append(float(mass)) # nonbonds if "nonbond parameters" in eex: lines.append("") if len(eex["nonbond parameters"]) > eex["n_atom_types"]: lines.append("PairIJ Coeffs") else: lines.append("Pair Coeffs") lines.append("") i = 1 j = 1 for parameters in eex["nonbond parameters"]: form, values, types, parameters_type, real_types = parameters if form == "nonbond(9-6)": lines.append( "{:6d} {} {} # {} --> {}".format( i, values["eps"], values["rmin"], types[0], real_types[0] ) ) i += 1 elif form == "nonbond(12-6)": lines.append( "{:6d} {} {} # {} --> {}".format( i, values["eps"], values["sigma"], types[0], real_types[0] ) ) i += 1 elif form == "buckingham": line = "{:6d} {} {} {} {}".format( j, i, values["A"], values["rho"], values["C"] ) line += ( f" # {types[1]}-{types[0]} --> {real_types[1]}_{real_types[0]}" ) lines.append(line) if j == i: i += 1 j = 1 else: j += 1 # bonds if "n_bonds" in eex and eex["n_bonds"] > 0: lines.append("") lines.append("Bonds") lines.append("") for counter, tmp in zip(range(1, eex["n_bonds"] + 1), eex["bonds"]): i, j, index = tmp lines.append("{:6d} {:6d} {:6d} {:6d}".format(counter, index, i, j)) lines.append("") lines.append("Bond Coeffs") lines.append("") forms = set([v[0] for v in eex["bond parameters"]]) use_hybrid = len(forms) > 1 for counter, parameters in zip( range(1, eex["n_bond_types"] + 1), eex["bond parameters"] ): form, values, types, parameters_type, real_types = parameters if form == "quadratic_bond": function = "harmonic" if use_hybrid else "" line = f"{counter:6d} {function} " f"{values['K2']} {values['R0']}" elif form == "quartic_bond": function = "class2" if use_hybrid else "" line = ( f"{counter:6d} {function} {values['R0']} " f"{values['K2']} {values['K3']} {values['K4']}" ) line += ( f" # {types[0]}-{types[1]}" f" --> {real_types[0]}_{real_types[1]}" ) lines.append(line) # angles if "n_angles" in eex and eex["n_angles"] > 0: lines.append("") lines.append("Angles") lines.append("") for counter, tmp in zip(range(1, eex["n_angles"] + 1), eex["angles"]): i, j, k, index = tmp lines.append( "{:6d} {:6d} {:6d} {:6d} {:6d}".format(counter, index, i, j, k) ) lines.append("") lines.append("Angle Coeffs") lines.append("") quartic_function = "class2" if "n_bond-bond_types" in eex else "quartic" forms = set([v[0] for v in eex["angle parameters"]]) use_hybrid = len(forms) > 1 for counter, parameters in zip( range(1, eex["n_angle_types"] + 1), eex["angle parameters"] ): form, values, types, parameters_type, real_types = parameters if form == "quadratic_angle": function = "harmonic" if use_hybrid else "" line = ( f"{counter:6d} {function} " f"{values['K2']} {values['Theta0']}" ) elif form == "quartic_angle": function = quartic_function if use_hybrid else "" line = ( f"{counter:6d} {function} {values['Theta0']} " f"{values['K2']} {values['K3']} {values['K4']}" ) elif form == "simple_fourier_angle": function = "fourier/simple" if use_hybrid else "" line = f"{counter:6d} {function} {values['K']} -1 {values['n']}" elif form == "tabulated_angle": function = "table" if use_hybrid else "" key = f"{types[0]}-{types[1]}-{types[2]}" line = f"{counter:6d} {function} tabulated_angles.dat {key}" angle_table.extend(self.angle_table(key, values)) line += ( f" # {types[0]} {types[1]} {types[2]}" f" --> {real_types[0]} {real_types[1]} {real_types[2]}" ) lines.append(line) # bond-bond coefficients, which must match angles in order & number if "n_bond-bond_types" in eex: lines.append("") lines.append("BondBond Coeffs") lines.append("") for counter, parameters, angles in zip( range(1, eex["n_bond-bond_types"] + 1), eex["bond-bond parameters"], eex["angle parameters"], ): form, values, types, parameters_type, real_types = parameters angle_form = angles[0] if angle_form == "quartic_angle": function = "class2" if use_hybrid else "" lines.append( "{:6d} {} {} {} {}".format( counter, function, values["K"], values["R10"], values["R20"], ) + " # {}-{}-{} --> {}-{}-{}".format( types[0], types[1], types[2], real_types[0], real_types[1], real_types[2], ) ) else: lines.append( "{:6d} skip".format(counter) + " # {}-{}-{} --> {}-{}-{}".format( types[0], types[1], types[2], real_types[0], real_types[1], real_types[2], ) ) # bond-angles coefficients, which must match angles in order & # number lines.append("") lines.append("BondAngle Coeffs") lines.append("") for counter, parameters, angles in zip( range(1, eex["n_bond-angle_types"] + 1), eex["bond-angle parameters"], eex["angle parameters"], ): form, values, types, parameters_type, real_types = parameters angle_form = angles[0] if angle_form == "quartic_angle": function = "class2" if use_hybrid else "" lines.append( "{:6d} {} {} {} {} {}".format( counter, function, values["K12"], values["K23"], values["R10"], values["R20"], ) + " # {}-{}-{} --> {}-{}-{}".format( types[0], types[1], types[2], real_types[0], real_types[1], real_types[2], ) ) else: lines.append( "{:6d} skip".format(counter) + " # {}-{}-{} --> {}-{}-{}".format( types[0], types[1], types[2], real_types[0], real_types[1], real_types[2], ) ) # torsions if "n_torsions" in eex and eex["n_torsions"] > 0: lines.append("") lines.append("Dihedrals") lines.append("") for counter, tmp in zip(range(1, eex["n_torsions"] + 1), eex["torsions"]): i, j, k, l, index = tmp lines.append( "{:6d} {:6d} {:6d} {:6d} {:6d} {:6d}".format( counter, index, i, j, k, l ) ) lines.append("") lines.append("Dihedral Coeffs") lines.append("") forms = set([v[0] for v in eex["torsion parameters"]]) use_hybrid = len(forms) > 1 for counter, parameters in zip( range(1, eex["n_torsion_types"] + 1), eex["torsion parameters"] ): form, values, types, parameters_type, real_types = parameters if form == "torsion_1": KPhi = values["KPhi"] n = values["n"] Phi0 = values["Phi0"] # Discover form is # KPhi * [1 + cos(n*Phi - Phi0)] # with trans = 180 # # For ethane, Phi0 = 0 so at Phi=180 E is min. Correct # Lammps for is # KPhi * [1 + d*cos(n*Phi)] # with trans = 180 # # Again for ethane, d=+1 and at Phi=180, E is min. # # Phi0 = 0 ==> d=+1 # Phi0 = 180 ==> d=-1 if float(Phi0) == 0.0: d = "-1" elif float(Phi0) == 180.0: d = "+1" else: raise RuntimeError( "LAMMPS cannot handle Phi0 = {}".format(Phi0) ) function = "harmonic" if use_hybrid else "" line = f"{counter:6d} {function} {KPhi} {d} {n}" elif form == "torsion_3": function = "class2" if use_hybrid else "" line = ( f"{counter:6d} {function} " f"{values['V1']} {values['Phi0_1']} " f"{values['V2']} {values['Phi0_2']} " f"{values['V3']} {values['Phi0_3']} " ) elif form == "torsion_opls": function = "opls" if use_hybrid else "" line = ( f"{counter:6d} {function} " f"{values['V1']} " f"{values['V2']} " f"{values['V3']} " f"{values['V4']} " ) line += ( f" # {types[0]}-{types[1]}-{types[2]}-{types[3]} " f"--> {real_types[0]}-{real_types[1]}-" f"{real_types[2]}-{real_types[3]}" ) lines.append(line) # middle bond-torsion_3 coefficients, which must match torsions # in order & number if "n_middle_bond-torsion_3_types" in eex: lines.append("") lines.append("MiddleBondTorsion Coeffs") lines.append("") for counter, parameters, torsions in zip( range(1, eex["n_middle_bond-torsion_3_types"] + 1), eex["middle_bond-torsion_3 parameters"], eex["torsion parameters"], ): form, values, types, parameters_type, real_types = parameters torsion_form = torsions[0] if torsion_form == "torsion_3": function = "class2" if use_hybrid else "" lines.append( "{:6d} {} {} {} {} {}".format( counter, function, values["V1"], values["V2"], values["V3"], values["R0"], ) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) else: lines.append( "{:6d} skip".format(counter) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) # end bond-torsion_3 coefficients, which must match torsions # in order & number lines.append("") lines.append("EndBondTorsion Coeffs") lines.append("") for counter, parameters, torsions in zip( range(1, eex["n_end_bond-torsion_3_types"] + 1), eex["end_bond-torsion_3 parameters"], eex["torsion parameters"], ): form, values, types, parameters_type, real_types = parameters torsion_form = torsions[0] if torsion_form == "torsion_3": function = "class2" if use_hybrid else "" lines.append( "{:6d} {} {} {} {} {} {} {} {} {}".format( counter, function, values["V1_L"], values["V2_L"], values["V3_L"], values["V1_R"], values["V2_R"], values["V3_R"], values["R0_L"], values["R0_R"], ) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) else: lines.append( "{:6d} skip".format(counter) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) # angle-torsion_3 coefficients, which must match torsions # in order & number lines.append("") lines.append("AngleTorsion Coeffs") lines.append("") for counter, parameters, torsions in zip( range(1, eex["n_angle-torsion_3_types"] + 1), eex["angle-torsion_3 parameters"], eex["torsion parameters"], ): form, values, types, parameters_type, real_types = parameters torsion_form = torsions[0] if torsion_form == "torsion_3": function = "class2" if use_hybrid else "" lines.append( "{:6d} {} {} {} {} {} {} {} {} {}".format( counter, function, values["V1_L"], values["V2_L"], values["V3_L"], values["V1_R"], values["V2_R"], values["V3_R"], values["Theta0_L"], values["Theta0_R"], ) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) else: lines.append( "{:6d} skip".format(counter) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) # angle-angle-torsion_1 coefficients, which must match torsions # in order & number lines.append("") lines.append("AngleAngleTorsion Coeffs") lines.append("") for counter, parameters, torsions in zip( range(1, eex["n_angle-angle-torsion_1_types"] + 1), eex["angle-angle-torsion_1 parameters"], eex["torsion parameters"], ): form, values, types, parameters_type, real_types = parameters torsion_form = torsions[0] if torsion_form == "torsion_3": function = "class2" if use_hybrid else "" lines.append( "{:6d} {} {} {} {}".format( counter, function, values["K"], values["Theta0_L"], values["Theta0_R"], ) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) else: lines.append( "{:6d} skip".format(counter) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) # bond-bond_1_3 coefficients, which must match torsions # in order & number lines.append("") lines.append("BondBond13 Coeffs") lines.append("") for counter, parameters, torsions in zip( range(1, eex["n_bond-bond_1_3_types"] + 1), eex["bond-bond_1_3 parameters"], eex["torsion parameters"], ): form, values, types, parameters_type, real_types = parameters torsion_form = torsions[0] if torsion_form == "torsion_3": function = "class2" if use_hybrid else "" lines.append( "{:6d} {} {} {} {}".format( counter, function, values["K"], values["R10"], values["R30"], ) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) else: lines.append( "{:6d} skip".format(counter) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) # out-of-planes if "n_oops" in eex and eex["n_oops"] > 0: lines.append("") lines.append("Impropers") lines.append("") # Need forms to reorder impropers.... form = {i: f[0] for i, f in enumerate(eex["oop parameters"], start=1)} for counter, tmp in zip(range(1, eex["n_oops"] + 1), eex["oops"]): i, j, k, l, index = tmp if form[index] == "improper_opls": lines.append( "{:6d} {:6d} {:6d} {:6d} {:6d} {:6d}".format( counter, index, j, i, k, l ) ) else: lines.append( "{:6d} {:6d} {:6d} {:6d} {:6d} {:6d}".format( counter, index, i, j, k, l ) ) lines.append("") lines.append("Improper Coeffs") lines.append("") for counter, parameters in zip( range(1, eex["n_oop_types"] + 1), eex["oop parameters"] ): form, values, types, parameters_type, real_types = parameters if form == "wilson_out_of_plane": lines.append( "{:6d} {} {}".format(counter, values["K"], values["Chi0"]) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) elif form == "improper_opls": # divide by two because OPLS uses V2/2 and CVS V. lines.append( f"{counter:6d} {float(values['V2'])/2:.4f} -1 2 " f"# {types[0]}-{types[1]}-{types[2]}-{types[3]} --> " f"{real_types[0]}-{real_types[1]}-{real_types[2]}-" f"{real_types[3]}" ) else: raise RuntimeError(f"Can't handle oop form '{form}'") # angle-angle if "n_angle-angle_types" in eex: lines.append("") lines.append("AngleAngle Coeffs") lines.append("") for counter, parameters in zip( range(1, eex["n_angle-angle_types"] + 1), eex["angle-angle parameters"], ): form, values, types, parameters_type, real_types = parameters lines.append( "{:6d} {} {} {} {} {} {}".format( counter, values["K1"], values["K2"], values["K3"], values["Theta10"], values["Theta20"], values["Theta30"], ) + " # {}-{}-{}-{} --> {}-{}-{}-{}".format( types[0], types[1], types[2], types[3], real_types[0], real_types[1], real_types[2], real_types[3], ) ) lines.append("") lines.append("") return ( "\n".join(lines), "\n".join(pair_table), "\n".join(bond_table), "\n".join(angle_table), "\n".join(dihedral_table), )
[docs] def analyze(self, indent="", nodes=None, **kwargs): """Analyze the output of the calculation""" if isinstance(nodes, list) is False: nodes = [nodes] ret = {node._id[1]: None for node in nodes} run_dir = Path(self.directory) # Divide the log file into sections for the steps log_file = run_dir / "log.lammps" if not log_file.is_file(): raise RuntimeError(f"Log file {log_file} is missing!") lines = log_file.read_text().split("\n") sections = {} section = None for line in lines: if line.startswith("# Step "): section = line[2:] sections[section] = [] elif section is not None: sections[section].append(line) if line.startswith("Total wall time:"): try: h, m, s = line.split()[3].split(":") _time = 3600 * float(h) + 60 * float(m) + float(s) self.results["t_lammps_wall"] = _time except Exception as _e: print(f"Wall time exception {_e}") for node in nodes: for value in node.description: printer.important(value) printer.important(" ") subdir = run_dir / str(node._id[-1]) # Move any old style files id_str = "_".join(str(e) for e in node._id) paths = sorted(run_dir.glob(f"*trajectory*{id_str}*.seamm_trj")) for path in paths: ensemble = str(path).split("_")[1] path.rename(subdir / f"{ensemble}_state.trj") paths = sorted(run_dir.glob(f"*summary*{id_str}*.seamm_trj")) for path in paths: ensemble = str(path).split("_")[1] path.rename(subdir / f"{ensemble}_summary.trj") # Find the state trajectory, if any paths = sorted(subdir.glob("*state.trj")) if len(paths) > 1: raise RuntimeError(f"More than one state.trj file for {id_str}.") elif len(paths) == 1: control_properties = lambda x: x not in ["tstep"] # noqa: E731 node_data, table = self.analyze_trajectory( paths[0], control_properties=control_properties, node=node ) # Save the state data as JSON data_file = subdir / "state.json" with data_file.open("w") as fd: json.dump( node_data, fd, indent=4, cls=CompactJSONEncoder, sort_keys=True ) # Get just the values from the node data values = {k: v["mean"] for k, v in node_data.items()} # And the other key values for k, v in node_data.items(): for key in ("stderr", "tau", "inefficiency", "n_samples"): if key in v: values[f"{k},{key}"] = v[key] else: node_data = None values = {} table = None node.analyze( data=values, properties=node_data, table=table, output=sections[node.header], ) ret[node._id[1]] = node_data return ret
[docs] def analyze_trajectory( self, path, sampling_rate=20, control_properties=None, node=None, ): """Read a trajectory file and do the statistical analysis""" write_html = "html" in self.options and self.options["html"] results = {} table = { "Property": [], "Value": [], " ": [], "StdErr": [], "Units": [], "convergence": [], "tau": [], "inefficiency": [], } # Process the trajectory data # temporary until we sort out multiple runs self._trajectory = [] with path.open() as fd: file_data = pandas.read_csv( fd, sep=" ", header=0, comment="!", usecols=control_properties, index_col="t", ) self._trajectory.append(file_data.iloc[:-1]) dt = lammps_step.from_lammps_units( file_data.index[1] - file_data.index[0], "fs" ) dt_fs = dt.m_as("fs") data = pandas.concat(self._trajectory) data = data.reset_index(drop=True) data.index *= dt_fs self.logger.debug("Columns: {}".format(data.columns)) self.logger.debug(" Types:\n{}".format(data.dtypes)) printer.normal(f" Analysis of {path.name}\n") # Work out the time step, rather than give the whole vector t = data.index t_units = "fs" len_trj = (len(t) - 1) * dt_fs if len_trj >= 4000000000: t_units = "ms" elif len_trj >= 4000000: t_units = "ns" elif len_trj >= 4000: t_units = "ps" t_max = float((len(t) - 1) * dt.m_as(t_units)) for column in data.columns: if "Unnamed:" in column: continue meta_column = column.rstrip("0123456789") if meta_column in LAMMPS.display_title: meta_title = LAMMPS.display_title[meta_column] elif column in LAMMPS.display_title: meta_title = LAMMPS.display_title[column] else: meta_title = f"unknown: {column}" if meta_column in LAMMPS.display_units: meta_units = LAMMPS.display_units[meta_column] elif column in LAMMPS.display_units: meta_units = LAMMPS.display_units[column] else: meta_units = "???" have_warning = False have_acf_warning = False # Ignore first point, t=0, cause it might not be right. yy = data[column].to_numpy()[1:] self.logger.info("Analyzing {}, nsamples = {}".format(column, len(yy))) # compute indices of uncorrelated timeseries using pymbar # Their algorithm is quadratic in length of Y unless you # use 'nskip'. I set it so there are about 100 time origins, so # the convergence time is accurate to about 1%. nskip = yy.size // 100 + 1 conv, inefficiency, Neff_max = timeseries.detect_equilibration( yy, nskip=nskip ) self.logger.info( " converged in {} steps, inefficiency = {}, Neff_max = {}".format( conv, inefficiency, Neff_max ) ) if np.isnan(inefficiency) or np.isnan(Neff_max): # Apparently didn't converge! table["Property"].append(column) table["Value"].append("") table[" "].append("") table["StdErr"].append("") table["Units"].append("") table["convergence"].append("unconverged") table["tau"].append("") table["inefficiency"].append("") have_acf = False is_converged = False else: is_converged = True tau = dt_fs * (inefficiency - 1) / 2 if tau < dt_fs / 2: tau = dt_fs / 2 t0 = conv * dt_fs y_t_equil = yy[conv:] indices = timeseries.subsample_correlated_data( y_t_equil, g=inefficiency ) if len(indices) == 0: print("Problem with column " + column) print("yy") print(yy) print("y_t_equil") print(y_t_equil) print("indices") print(indices) continue y_n = y_t_equil[indices] n_samples = len(y_n) mean = float(y_n.mean()) std = float(y_n.std()) sem = std / sqrt(n_samples) # Get the autocorrelation function if len(y_t_equil) < 8: have_acf = False have_acf_warning = True acf_warning = "^" elif all(y_t_equil == y_t_equil[0]): # A constant value, so no ACF have_acf = False acf_warning = "" else: have_acf = True acf_warning = "" nlags = 4 * int(round(inefficiency + 0.5)) if nlags > int(len(y_t_equil) / 2): nlags = int(len(y_t_equil) / 2) with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) acf, confidence = stattools.acf( y_t_equil, nlags=nlags, alpha=0.05, fft=True, adjusted=False, ) results[column] = {} results[column]["mean"] = mean results[column]["stderr"] = sem results[column]["n_sample"] = n_samples results[column]["short_production"] = have_acf_warning results[column]["tau"] = float(tau) results[column]["inefficiency"] = float(inefficiency) results[column]["timestep"] = float(dt_fs) results[column]["rootname"] = path.stem if have_acf: results[column]["acf"] = acf.tolist() results[column]["acf_confidence"] = confidence.tolist() # Don't print or graph some properties, like heat flux if column in ("Jx", "Jy", "Jz"): results[column]["values"] = data[column].to_numpy() # Work out units on convergence time conv_units = "fs" t_conv = t0 if t0 >= 1000000000: conv_units = "ms" t_conv = t0 / 1000000000 elif t0 >= 1000000: conv_units = "ns" t_conv = t0 / 1000000 elif t0 >= 1000: conv_units = "ps" t_conv = t0 / 1000 # Work out units on autocorrelation time tau_units = "fs" t_tau = tau if tau >= 1000000000: tau_units = "ms" t_tau = tau / 1000000000 elif tau >= 1000000: tau_units = "ns" t_tau = tau / 1000000 elif tau >= 1000: tau_units = "ps" t_tau = tau / 1000 if n_samples < 100: have_warning = True warn = "*" else: warn = " " results[column]["few_neff"] = have_warning table["Property"].append(f"{column}{warn}") table["Value"].append(f"{mean:.3f}") table[" "].append("±") table["StdErr"].append(f"{sem:.3f}") table["Units"].append(meta_units) table["convergence"].append(f"{t_conv:.2f} {conv_units}") table["tau"].append(f"{t_tau:.1f} {tau_units}{acf_warning}") table["inefficiency"].append(f"{inefficiency:.1f}") # Create graphs of the property figure = self.create_figure( module_path=(self.__module__.split(".")[0], "seamm"), template="line.graph_template", title=meta_title, ) # The autocorrelation function if have_acf: plot_acf = figure.add_plot("acf") t_acf_units = "fs" len_acf = (len(acf) - 1) * dt_fs if len_acf >= 2000000000: t_acf_units = "ms" elif len_acf >= 2000000: t_acf_units = "ns" elif len_acf >= 2000: t_acf_units = "ps" x_acf_axis = plot_acf.add_axis( "x", label="Time ({})".format(t_acf_units) ) y_acf_axis = plot_acf.add_axis("y", label="acf", anchor=x_acf_axis) x_acf_axis.anchor = y_acf_axis # Put the fit to the autocorrelation time in first so the # subsequent trajectory trace sits in top ts = 0.0 fit = [1.0] for step in range(len(acf) - 1): ts += dt_fs fit.append(exp(-ts / tau)) plot_acf.add_trace( x_axis=x_acf_axis, y_axis=y_acf_axis, name="fit", x0=0, dx=dt.m_as(t_acf_units), xlabel="t", xunits=t_acf_units, y=fit, ylabel="fit", yunits="", color="gray", ) # the partly transparent error band yplus = [] yminus = [] t_acf = [] tmp = 0.0 for lower, upper in confidence: t_acf.append(tmp) yplus.append(upper) yminus.append(lower) tmp += dt.m_as(t_acf_units) plot_acf.add_trace( x_axis=x_acf_axis, y_axis=y_acf_axis, name="stderr", x=t_acf + t_acf[::-1], xlabel="t", xunits=t_acf_units, y=yplus + yminus[::-1], ylabel="stderr", yunits=meta_units, color="rgba(211,211,211,0.5)", fill="toself", ) # And the acf plot last plot_acf.add_trace( x_axis=x_acf_axis, y_axis=y_acf_axis, name="acf", x0=0, dx=dt.m_as(t_acf_units), xlabel="t", xunits=t_acf_units, y=list(acf), ylabel="acf", yunits="", color="red", ) # The property data over the trajectory y = list(data[column]) plot = figure.add_plot("trj") ylabel = meta_title if meta_units != "": ylabel += f" ({meta_units})" x_axis = plot.add_axis("x", label="Time ({})".format(t_units)) y_axis = plot.add_axis("y", label=ylabel, anchor=x_axis) x_axis.anchor = y_axis # Add the trajectory, error band and median value in that order so # stack in a nice order. # Add the trajectory plot.add_trace( x_axis=x_axis, y_axis=y_axis, name=column, x0=0, dx=dt.m_as(t_units), xlabel="t", xunits=t_units, y=list(y), ylabel=column, yunits=meta_units, color="#4dbd74", ) if is_converged: # the partly transparent error band t_min = t0 / dt_fs * dt.m_as(t_units) plot.add_trace( x_axis=x_axis, y_axis=y_axis, name="sem", x=[t_min, t_max, t_max, t_min], xlabel="t", xunits=t_units, y=[mean + sem, mean + sem, mean - sem, mean - sem], ylabel="sem", yunits=meta_units, color="rgba(211,211,211,0.5)", fill="toself", ) # and finally the median value so it is on top plot.add_trace( x_axis=x_axis, y_axis=y_axis, name="average", x=[t_min, t_max], xlabel="t", xunits=t_units, y=[mean, mean], ylabel="average", yunits=meta_units, color="black", ) if have_acf: figure.grid_plots("trj - acf") else: figure.grid_plots("trj") if node is None: path = Path(f"{path.stem}_{column}.graph") else: node_path = Path(node.directory) node_path.mkdir(parents=True, exist_ok=True) path = node_path / f"{column}.graph" figure.dump(path) if write_html: figure.template = "line.html_template" figure.dump(path.with_suffix(".html")) # Add citations for pymbar self.references.cite( raw=self._bibliography["mbar"], alias="pymbar-1", module="lammps_step", level=1, note="The main reference for pymbar.", ) self.references.cite( raw=self._bibliography["histogram"], alias="pymbar-2", module="lammps_step", level=2, note="The second citation for pymbar", ) self.references.cite( raw=self._bibliography["convergence"], alias="pymbar-3", module="lammps_step", level=2, note="The third citation for pymbar", ) return results, table
[docs] def shake_fix(self, P, eex): """Create the 'fix shake' line needed for handling waters and X-H. Parameters ---------- P : dict The parameters for the initialization step as a dict. eex : dict The energy expression for this calculation Returns ------- line : str The correct fix line for LAMMPS """ bond_types = {} angle_types = {} # Water models if P["rigid_waters"]: # waters = seamm_util.water_models.Water.find_waters(data.structure) # noqa: E501 waters = [] if len(waters) > 0: atoms = [] for i, j, k in waters: atoms.append(i) atoms.append(j) atoms.append(k) if "n_bonds" in eex and eex["n_bonds"] > 0: for i, j, index in eex["bonds"]: if i in atoms and j in atoms: bond_types[index] = 1 if "n_angles" in eex and eex["n_angles"] > 0: for i, j, k, index in eex["angles"]: if i in atoms and j in atoms and k in atoms: angle_types[index] = 1 # Fixing bond lengths of X-H bonds... if "n_bonds" in eex and eex["n_bonds"] > 0: fix_bonds = P["fix_XH_bond_lengths"] elements = eex["elements"] if fix_bonds == "CH": for i, j, index in eex["bonds"]: if (elements[i] == "C" and elements[j] == "H") or ( elements[i] == "H" and elements[j] == "C" ): bond_types[index] = 1 elif fix_bonds == "all": for i, j, index in eex["bonds"]: if elements[i] == "H" or elements[j] == "H": bond_types[index] = 1 # And the result is .... if len(bond_types) > 0: result = "fix {} all rattle 0.001 20 1000 b " for bond_type in bond_types.keys(): result += " " + str(bond_type) if len(angle_types) > 0: result += " a " for angle_type in angle_types.keys(): result += " " + str(angle_type) else: result = "" return result
[docs] def read_dump(self, dumpfile): """Read the LAMMPS dumpfile and update the system. Parameters ---------- dumpfile : str The filename (or path) to the dumpfile. """ self.logger.info("Reading dump file '{}'".format(dumpfile)) system_db = self.get_variable("_system_db") configuration = system_db.system.configuration periodicity = configuration.periodicity n_atoms = configuration.n_atoms section = "" section_lines = [] with open(dumpfile, "r") as fd: lineno = 0 for line in fd: line = line.strip() lineno += 1 if lineno == 1: if line[0:5] != "ITEM:": raise RuntimeError( "Error reading dump file '" + dumpfile + "': The " "first line is incorrect! (" + line + ")" ) section = line[6:].strip() section_lines = [] self.logger.debug(" section = " + section) continue if line[0:5] == "ITEM:": # end a section self.logger.debug(" processing section '{}'".format(section)) if "BOX BOUNDS" in section: if len(section.split()) == 8: xlo_bound, xhi_bound, xy = section_lines[0].split() ylo_bound, yhi_bound, xz = section_lines[1].split() zlo, zhi, yz = section_lines[2].split() xlo_bound = float(xlo_bound) xhi_bound = float(xhi_bound) ylo_bound = float(ylo_bound) yhi_bound = float(yhi_bound) zlo = float(zlo) zhi = float(zhi) xy = float(xy) xz = float(xz) yz = float(yz) xlo = xlo_bound - min(0.0, xy, xz, xy + xz) xhi = xhi_bound - max(0.0, xy, xz, xy + xz) ylo = ylo_bound - min(0.0, yz) yhi = yhi_bound - max(0.0, yz) cell = LAMMPS.box_to_cell( xhi - xlo, yhi - ylo, zhi - zlo, xy, xz, yz ) else: xlo, xhi = section_lines[0].split() ylo, yhi = section_lines[1].split() zlo, zhi = section_lines[2].split() xlo = float(xlo) xhi = float(xhi) ylo = float(ylo) yhi = float(yhi) zlo = float(zlo) zhi = float(zhi) cell = (xhi - xlo, yhi - ylo, zhi - zlo, 90, 90, 90) elif section == "NUMBER OF ATOMS": if int(section_lines[0]) != n_atoms: raise RuntimeError( "Number of atoms has changed! {} to {}".format( n_atoms, section_lines[0] ) ) elif "ATOMS" in section: xyz = [] vxyz = [] keys = section.split()[1:] if keys[1:4] == ["x", "y", "z"] or keys[1:4] == [ "xu", "yu", "zu", ]: fractional = False elif keys[1:4] == ["xs", "ys", "zs"] or keys[1:4] == [ "xsu", "ysu", "zsu", ]: fractional = True else: logger.error(f"Can't handle dump file, {keys=}") if len(keys) >= 7 and keys[4:7] == ["vx", "vy", "vz"]: have_velocities = True factor = lammps_step.from_lammps_units(1, "fs").magnitude factor = 1 / factor for tmp in section_lines: x, y, z, vx, vy, vz = tmp.split()[1:7] xyz.append([float(x), float(y), float(z)]) vxyz.append( [ factor * float(vx), factor * float(vy), factor * float(vz), ] ) else: have_velocities = False for tmp in section_lines: x, y, z = tmp.split()[1:4] xyz.append([float(x), float(y), float(z)]) section = line[6:].strip() section_lines = [] else: section_lines.append(line) # Clean up the last section xyz = [] vxyz = [] if "ATOMS" in section: self.logger.debug(" processing section '{}'".format(section)) self.logger.debug(" handling the atoms") keys = section.split()[1:] if keys[1:4] == ["x", "y", "z"] or keys[1:4] == ["xu", "yu", "zu"]: fractional = False elif keys[1:4] == ["xs", "ys", "zs"] or keys[1:4] == ["xsu", "ysu", "zsu"]: fractional = True else: logger.error(f"Can't handle dump file, {keys=}") if len(keys) >= 7 and keys[4:7] == ["vx", "vy", "vz"]: have_velocities = True factor = 1.0 / lammps_step.from_lammps_units(1, "fs").magnitude for tmp in section_lines: x, y, z, vx, vy, vz = tmp.split()[1:7] xyz.append([float(x), float(y), float(z)]) vxyz.append( [factor * float(vx), factor * float(vy), factor * float(vz)] ) else: have_velocities = False for tmp in section_lines: x, y, z = tmp.split()[1:4] xyz.append([float(x), float(y), float(z)]) if periodicity == 3: configuration.cell.parameters = cell configuration.atoms.set_coordinates(xyz, fractionals=fractional) if have_velocities: # LAMMPS only has Cartesian velocities configuration.atoms.set_velocities(vxyz, fractionals=False)
def _add_lammps_citations(self, text, cite=None): """Add the two main citations for LAMMPS, getting the version from stdout text. Parameters ---------- text : str The standard output from LAMMPS Returns ------- None """ # Add the JCP paper self.references.cite( raw=self._bibliography["PLIMPTON19951"], alias="lammps-jcp", module="lammps_step", level=1, note="The principle LAMMPS citation.", ) # And the citation to the LAMMPS code itself lines = text.splitlines() if len(lines) == 0: return line = lines[0] tmp = line.split(" ") if len(tmp) != 4: self.logger.info(f"Cannot get LAMMPS version: '{line}'") return month = tmp[2] year = tmp[3].rstrip(")") version = " ".join(tmp[1:]) try: template = string.Template(self._bibliography["lammps"]) citation = template.substitute(month=month, version=version, year=year) self.references.cite( raw=citation, alias="lammps-exe", module="lammps_step", level=1, note="The principle citation for the LAMMPS executable.", ) except Exception as e: printer.important(f"Exception in citation {type(e)}: {e}") printer.important(traceback.format_exc()) # If there is a log.cite file, process it if cite is not None: self.logger.debug("log.cite\n" + cite + "\n") bibliography = {} tmp = bibtexparser.loads(cite).entries_dict writer = bibtexparser.bwriter.BibTexWriter() for key, data in tmp.items(): self.logger.info(f" {key}") bibliography[key] = writer._entry_to_bibtex(data) self.logger.debug("Bibliography\n" + pprint.pformat(bibliography)) for entry in bibliography: if entry.lower() in ("commment",): continue self.references.cite( raw=bibliography[entry], alias=entry, module="lammps_step", level=1, note="LAMMPS citations from log.cite.", )
[docs] def angle_table(self, key, values): """Create a section of the tabulated angle file. Parameters ---------- key : str The name for this section in the file. values : {str: int, float, or str} The dictionary of the constants for this angle term. Returns ------- [str] A list of lines of the tabulated angle file. values looks like this:: { 'reference': '5', 'Eqn': 'K/8*(1-cos(n*Theta)) + A/(2*Rb*sin(Theta/2))**12', 'K': 278.4416826003824, 'n': 4, 'Rb': 1.606, 'A': 11.950286806883364, 'zero-shift': 0.001 } """ lines = [] data = {**values} if "reference" in data: del data["reference"] eqn = data.pop("Eqn") thetas, Es, dEs = tabulate_angle(eqn, data) lines.append(key) lines.append(f"N {len(thetas)}") lines.append("") for i, data in enumerate(zip(thetas, Es, dEs), start=1): theta, E, dE = data lines.append(f"{i:5d} {theta:9.4f} {E:40}{-dE:40}") lines.append("") return lines
[docs] def get_dump(self, dumpfile): """Read the LAMMPS dumpfile and return the data. Parameters ---------- dumpfile : str The filename (or path) to the dumpfile. """ self.logger.info("Reading dump file '{}'".format(dumpfile)) # system_db = self.get_variable("_system_db") # configuration = system_db.system.configuration # periodicity = configuration.periodicity # n_atoms = configuration.n_atoms result = { "timestep": [], "n_atoms": [], "cell": [], "data": [], } section = "" section_lines = [] with open(dumpfile, "r") as fd: lineno = 0 for line in fd: line = line.strip() lineno += 1 if lineno == 1: if line[0:5] != "ITEM:": raise RuntimeError( "Error reading dump file '" + dumpfile + "': The " "first line is incorrect! (" + line + ")" ) section = line[6:].strip() section_lines = [] self.logger.debug(" section = " + section) continue if line[0:5] == "ITEM:": # end a section self.logger.debug(" processing section '{}'".format(section)) if section == "TIMESTEP": result["timestep"].append(int(section_lines[0])) elif section == "NUMBER OF ATOMS": n_atoms = int(section_lines[0]) result["n_atoms"].append(n_atoms) elif "BOX BOUNDS" in section: if len(section.split()) == 8: xlo_bound, xhi_bound, xy = section_lines[0].split() ylo_bound, yhi_bound, xz = section_lines[1].split() zlo, zhi, yz = section_lines[2].split() xlo_bound = float(xlo_bound) xhi_bound = float(xhi_bound) ylo_bound = float(ylo_bound) yhi_bound = float(yhi_bound) zlo = float(zlo) zhi = float(zhi) xy = float(xy) xz = float(xz) yz = float(yz) xlo = xlo_bound - min(0.0, xy, xz, xy + xz) xhi = xhi_bound - max(0.0, xy, xz, xy + xz) ylo = ylo_bound - min(0.0, yz) yhi = yhi_bound - max(0.0, yz) cell = LAMMPS.box_to_cell( xhi - xlo, yhi - ylo, zhi - zlo, xy, xz, yz ) else: xlo, xhi = section_lines[0].split() ylo, yhi = section_lines[1].split() zlo, zhi = section_lines[2].split() xlo = float(xlo) xhi = float(xhi) ylo = float(ylo) yhi = float(yhi) zlo = float(zlo) zhi = float(zhi) cell = (xhi - xlo, yhi - ylo, zhi - zlo, 90, 90, 90) result["cell"].append(cell) elif "ATOMS" in section: keys = section.split()[2:] if "fields" not in result: result["fields"] = keys elif keys != result["fields"]: raise ValueError( f"Error reading dump file '{dumpfile}': The fields in " "the ATOMS section do not match the previous fields!\n" f" timestep: {result['timestep'][-1]}\n" f" previous: {result['fields']=}\n" f" current: {keys=}" ) tmp = [[None] * n_atoms for key in keys] for line in section_lines: values = line.split() _id = int(values[0]) - 1 for index, value in enumerate(values[1:]): key = keys[index] if key in ("mol", "proc", "procp1", "type"): value = int(value) elif key in ("element",): value = value.strip() else: value = float(value) tmp[index][_id] = value result["data"].append(tmp) section = line[6:].strip() section_lines = [] else: section_lines.append(line) # Clean up the last section if "ATOMS" in section: keys = section.split()[2:] if "fields" not in result: result["fields"] = keys elif keys != result["fields"]: raise ValueError( f"Error reading dump file '{dumpfile}': The fields in " "the ATOMS section do not match the previous fields!\n" f" timestep: {result['timestep'][-1]}\n" f" previous: {result['fields']=}\n" f" current: {keys=}" ) tmp = [[None] * n_atoms for key in keys] for line in section_lines: values = line.split() _id = int(values[0]) - 1 for index, value in enumerate(values[1:]): key = keys[index] if key in ("mol", "proc", "procp1", "type"): value = int(value) elif key in ("element",): value = value.strip() else: value = float(value) tmp[index][_id] = value result["data"].append(tmp) return result