Source code for psi4_step.psi4

# -*- coding: utf-8 -*-

"""Non-graphical part of the Psi4 step in a SEAMM flowchart
"""

import configparser
import importlib
import json
import logging
from pathlib import Path
import pprint
import shutil

import psi4_step
import seamm
import seamm_exec
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __

# In addition to the normal logger, two logger-like printing facilities are
# defined: 'job' and 'printer'. 'job' send output to the main job.out file for
# the job, and should be used very sparingly, typically to echo what this step
# will do in the initial summary of the job.
#
# 'printer' sends output to the file 'step.out' in this steps working
# directory, and is used for all normal output from this step.

logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("Psi4")

pre_code = """\
def fix_multipoles(data):
    result = {}
    esp = []
    it = iter(data.items())
    for key, value in it:
        if 'PROP ' == key[0:5]:
            result[key[6:]] = value[0]
        elif 'SCF DIPOLE' == key:
            result[key] = value[0]
        elif 'CURRENT DIPOLE' == key:
            result[key] = value[0]
        elif '32-POLE' in key:
            tmp = []
            while True:
                value = 0.0 if abs(value) < 1.0e-10 else value
                tmp.append(value)
                if 'ZZZZZ' in key:
                    break
                key, value = next(it)
            result['32-POLE'] = tmp
        elif 'HEXADECAPOLE' in key:
            tmp = []
            while True:
                value = 0.0 if abs(value) < 1.0e-10 else value
                tmp.append(value)
                if 'ZZZZ' in key:
                    break
                key, value = next(it)
            result['HEXADECAPOLE'] = tmp
        elif 'OCTUPOLE' in key:
            tmp = []
            while True:
                value = 0.0 if abs(value) < 1.0e-10 else value
                tmp.append(value)
                if 'ZZZ' in key:
                    break
                key, value = next(it)
            result['OCTUPOLE'] = tmp
        elif 'QUADRUPOLE' in key:
            tmp = []
            while True:
                value = 0.0 if abs(value) < 1.0e-10 else value
                tmp.append(value)
                if 'ZZ' in key:
                    break
                key, value = next(it)
            result['QUADRUPOLE'] = tmp
        elif 'DIPOLE' in key:
            tmp = []
            while True:
                value = 0.0 if abs(value) < 1.0e-10 else value
                tmp.append(value)
                result[key] = value
                if 'Z' in key:
                    break
                key, value = next(it)
            result[key[0:-2]] = tmp
        elif 'ESP AT CENTER' in key:
            esp.append(value)
        else:
            result[key] = value

    if len(esp) > 0:
        result['ELECTROSTATIC POTENTIAL'] = esp
    return result
"""


[docs] def humanize(memory, suffix="B", kilo=1024): """ Scale memory to its proper format e.g: 1253656 => '1.20 MiB' 1253656678 => '1.17 GiB' """ if kilo == 1000: units = ["", "k", "M", "G", "T", "P"] elif kilo == 1024: units = ["", "Ki", "Mi", "Gi", "Ti", "Pi"] else: raise ValueError("kilo must be 1000 or 1024!") for unit in units: if memory < kilo: return f"{memory:.2f} {unit}{suffix}" memory /= kilo
[docs] def dehumanize(memory, suffix="B"): """ Unscale memory from its human readable form e.g: '1.20 MB' => 1200000 '1.17 GB' => 1170000000 """ units = { "": 1, "k": 1000, "M": 1000**2, "G": 1000**3, "P": 1000**4, "Ki": 1024, "Mi": 1024**2, "Gi": 1024**3, "Pi": 1024**4, } tmp = memory.split() if len(tmp) == 1: return memory elif len(tmp) > 2: raise ValueError("Memory must be <number> <units>, e.g. 1.23 GB") amount, unit = tmp amount = float(amount) for prefix in units: if prefix + suffix == unit: return int(amount * units[prefix]) raise ValueError(f"Don't recognize the units on '{memory}'")
[docs] class Psi4(seamm.Node): """ The non-graphical part of a Psi4 step in a flowchart. Attributes ---------- options : tuple It contains a two item tuple containing the populated namespace and the list of remaining argument strings. subflowchart : seamm.Flowchart A SEAMM Flowchart object that represents a subflowchart, if needed. parameters : Psi4Parameters The control parameters for Psi4. See Also -------- TkPsi4, Psi4, Psi4Parameters """ def __init__( self, flowchart=None, title="Psi4", namespace="org.molssi.seamm.psi4", extension=None, ): """A step for Psi4 in a SEAMM flowchart. You may wish to change the title above, which is the string displayed in the box representing the step in the flowchart. Parameters ---------- flowchart: seamm.Flowchart The non-graphical flowchart that contains this step. title: str The name displayed in the flowchart. namespace: The namespace for the plugins of the subflowchart extension: None Not yet implemented """ logger.debug("Creating Psi4 {}".format(self)) self.subflowchart = seamm.Flowchart( parent=self, name="Psi4", namespace=namespace ) super().__init__( flowchart=flowchart, title="Psi4", extension=extension, logger=logger ) self.parameters = psi4_step.Psi4Parameters() @property def version(self): """The semantic version of this module.""" return psi4_step.__version__ @property def git_revision(self): """The git version of this module.""" return psi4_step.__git_revision__
[docs] def create_parser(self): """Setup the command-line / config file parser""" # parser_name = 'psi4-step' parser_name = self.step_type parser = self.flowchart.parser # Remember if the parser exists ... this type of step may have been # found before parser_exists = parser.exists(parser_name) # Create the standard options, e.g. log-level result = super().create_parser(name=parser_name) if parser_exists: return result # Options for Psi4 parser.add_argument( parser_name, "--ncores", default="available", help="How many threads to use in Psi4", ) parser.add_argument( parser_name, "--memory", default="available", help=( "The maximum amount of memory to use for Psi4, which can be " "'all' or 'available', or a number, which may use k, Ki, " "M, Mi, etc. suffixes. Default: available." ), ) parser.add_argument( parser_name, "--memory-factor", default="90%", help=( "The amount of possible memory to use, typically about 90%% to" " allow for Psi4 not keeping track of all the memory used." ), ) return result
[docs] def set_id(self, node_id): """Set the id for node to a given tuple""" self._id = node_id # and set our subnodes self.subflowchart.set_ids(self._id) return self.next()
[docs] def description_text(self, P=None): """Create the text description of what this step will do. The dictionary of control values is passed in as P so that the code can test values, etc. Parameters ---------- P: dict An optional dictionary of the current values of the control parameters. Returns ------- description : str A description of the current step. """ self.subflowchart.root_directory = self.flowchart.root_directory # Get the first real node node = self.subflowchart.get_node("1").next() text = self.header + "\n\n" while node is not None: text += __(node.description_text(), indent=4 * " ").__str__() text += "\n" node = node.next() return text
[docs] def run(self): """Run a Psi4 step. Returns ------- next_node : seamm.Node The next node object in the flowchart. """ printer.important(self.header) printer.important("") # Add the main citation for Psi4 self.references.cite( raw=self._bibliography["doi:10.1063/5.0006002"], alias="psi4", module="psi4 step", level=1, note="The principle Psi4 citation.", ) system_db = self.get_variable("_system_db") configuration = system_db.system.configuration n_atoms = configuration.n_atoms if n_atoms == 0: self.logger.error("Psi4 run(): there is no structure!") raise RuntimeError("Psi4 run(): there is no structure!") # Access the options options = self.options seamm_options = self.global_options # Get the computational environment and set limits ce = seamm_exec.computational_environment() # Maximum number of threads n_threads = ce["NTASKS"] if options["ncores"] == "available": pass else: n_threads = int(options["ncores"]) if n_threads < 1: n_threads = 1 if seamm_options["ncores"] != "available": tmp = int(seamm_options["ncores"]) if tmp < n_threads: n_threads = tmp ce["NTASKS"] = n_threads # And memory if seamm_options["memory"] == "all": mem_limit = ce["MEM_PER_NODE"] elif seamm_options["memory"] == "available": # For the default, 'available', use in proportion to number of # cores used mem_limit = n_threads * ce["MEM_PER_CPU"] else: mem_limit = dehumanize(seamm_options["memory"]) if options["memory"] == "all": memory = ce["MEM_PER_NODE"] elif options["memory"] == "available": # For the default, 'available', use in proportion to number of # cores used memory = n_threads * ce["MEM_PER_CPU"] else: memory = dehumanize(options["memory"]) memory = min(memory, mem_limit) if "%" in options["memory_factor"]: factor = float(options["memory_factor"].rstrip("%")) / 100.0 else: factor = float(options["memory_factor"]) memory *= factor # Psi applies a minimum of 250 MiB min_memory = dehumanize("250 MiB") if min_memory > memory: memory = min_memory ce["MEM_PER_NODE"] = memory memory = humanize(memory, kilo=1024) # Work through the subflowchart to find out what to do. self.subflowchart.root_directory = self.flowchart.root_directory next_node = super().run(printer) # Get the first real node node0 = self.subflowchart.get_node("1").next() # Start the input data input_data = [] input_data.append("import json") input_data.append("import numpy as np") input_data.append("from pathlib import Path") input_data.append("import pprint") input_data.append("") input_data.append(f"memory {memory}") input_data.append("") input_data.append(pre_code) input_data.append("") # Put the structure into the input input_data.append(self._convert_structure(name="initial")) node = node0 while node is not None: text = node.get_input() input_data.append(text) input_data.append("clean()") input_data.append("clean_variables()") # input_data.append('clean_options()') node = node.next() # Write out the final structure input_data.append("") input_data.append("# Write the final structure to disk") input_data.append("molecule = get_active_molecule()") input_data.append("tmp = molecule.to_dict()") input_data.append("for item, value in tmp.items():") input_data.append(" if isinstance(value, np.ndarray):") input_data.append(" tmp[item] = value.tolist()") input_data.append("") input_data.append("with open('final_structure.json', 'w') as fd:") input_data.append(" json.dump(tmp, fd, sort_keys=True, indent=3)") files = {"input.dat": "\n".join(input_data)} self.logger.info("input.dat:\n" + files["input.dat"]) directory = Path(self.directory) directory.mkdir(parents=True, exist_ok=True) return_files = ["output.dat", "*.json", "*.cube"] # Check for already having run path = Path(self.directory) / "success.dat" if path.exists(): result = {} path = Path(self.directory) / "stdout.txt" if path.exists(): result["stdout"] = path.read_text() result["stderr"] = "" failed = False else: executor = self.flowchart.executor # Read configuration file for Psi4 if it exists executor_type = executor.name full_config = configparser.ConfigParser() ini_dir = Path(seamm_options["root"]).expanduser() path = ini_dir / "psi4.ini" if path.exists(): full_config.read(ini_dir / "psi4.ini") # If the section we need doesn't exists, get the default if not path.exists() or executor_type not in full_config: resources = importlib.resources.files("psi4_step") / "data" ini_text = (resources / "psi4.ini").read_text() full_config.read_string(ini_text) # Getting desperate! Look for an executable in the path if executor_type not in full_config: path = shutil.which("psi4") if path is None: raise RuntimeError( f"No section for '{executor_type}' in Psi4 ini file " f"({ini_dir / 'psi4.ini'}), nor in the defaults, nor " "in the path!" ) else: full_config[executor_type] = { "installation": "local", "code": f"{path} -n {{NTASKS}}", } config = dict(full_config.items(executor_type)) # Use the matching version of the seamm-psi4 image by default. config["version"] = self.version printer.important( self.indent + f" Psi4 will use {ce['NTASKS']} threads and {memory} memory\n" ) result = executor.run( cmd=["{code}"], config=config, directory=self.directory, files=files, return_files=return_files, in_situ=True, shell=True, ce=ce, ) if not result: self.logger.error("There was an error running Psi4") raise RuntimeError("There was an error running Psi4") self.logger.debug("\n" + pprint.pformat(result)) failed = False if "output.dat" in result["files"]: if result["output.dat"]["data"] is not None: if ( "*** Psi4 exiting successfully." not in result["output.dat"]["data"] ): self.logger.warning("Psi4 did not complete successfully.") failed = True if not failed: # Write a small file to say that LAMMPS ran successfully, so cancel # skip if rerunning. path = Path(self.directory) / "success.dat" path.write_text("success") # Analyze the results self.analyze() if failed: raise RuntimeError("Psi4 did not complete successfully.") return next_node
[docs] def analyze(self, indent="", **kwargs): """Do any analysis of the output from this step. Also print important results to the local step.out file using 'printer'. Parameters ---------- indent: str An extra indentation for the output """ # Update the structure directory = Path(self.directory) structure_file = directory / "final_structure.json" if structure_file.exists(): with structure_file.open(mode="r") as fd: structure = json.load(fd) if "geom" in structure: system_db = self.get_variable("_system_db") configuration = system_db.system.configuration xs = [] ys = [] zs = [] it = iter(structure["geom"]) for x in it: xs.append(x) ys.append(next(it)) zs.append(next(it)) configuration.atoms["x"][0:] = xs configuration.atoms["y"][0:] = ys configuration.atoms["z"][0:] = zs printer.important( self.indent + " Updated the system with the structure from Psi4", ) printer.important("") # Get the first real node node = self.subflowchart.get_node("1").next() # Loop over the subnodes, asking them to do their analysis while node is not None: for value in node.description: printer.important(value) node.analyze() printer.normal("") node = node.next()
def _convert_structure(self, name=None, no_com=True, no_reorient=True): """Convert the structure to the input for Psi4.""" system_db = self.get_variable("_system_db") configuration = system_db.system.configuration structure = [] if name is None: structure.append("molecule {") else: structure.append("molecule " + name + " {") # Charge and multiplicity structure.append(f" {configuration.charge} {configuration.spin_multiplicity}") elements = configuration.atoms.symbols coordinates = configuration.atoms.coordinates if "freeze" in configuration.atoms: freeze = configuration.atoms["freeze"] else: freeze = [""] * len(elements) for element, xyz, frz in zip(elements, coordinates, freeze): x, y, z = xyz structure.append( f" {element:2s} {float(x): 12.8f} {float(y): 12.8f} " f"{float(z): 12.8f}" ) structure.append("") if no_com: structure.append("no_com") if no_reorient: structure.append("no_reorient") structure.append("}") return "\n".join(structure) + "\n"