Source code for quickmin_step.quickmin

# -*- coding: utf-8 -*-

"""Non-graphical part of the QuickMin step in a SEAMM flowchart
"""

import os
import sys
import threading
import time

import logging
import numpy as np
from pathlib import Path
import pkg_resources
import pprint  # noqa: F401
import shutil
import string
import subprocess

from openbabel import openbabel

import molsystem
import quickmin_step
import seamm
from seamm_util import ureg, Q_  # noqa: F401
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __

# Add this modules properties to the standard properties
path = Path(pkg_resources.resource_filename(__name__, "data/"))
csv_file = path / "properties.csv"
if path.exists():
    molsystem.add_properties_from_file(csv_file)

if "OpenBabel_version" not in globals():
    OpenBabel_version = None


[docs] class OutputGrabber(object): """Class used to grab standard output or another stream. see https://stackoverflow.com/questions/24277488/in-python-how-to-capture-the-stdout-from-a-c-shared-library-to-a-variable/29834357#29834357 # noqa: E501 """ escape_char = "\b" def __init__(self, stream=None, threaded=False): self.origstream = stream self.threaded = threaded if self.origstream is None: self.origstream = sys.stdout self.origstreamfd = self.origstream.fileno() self.capturedtext = "" # Create a pipe so the stream can be captured: self.pipe_out, self.pipe_in = os.pipe() def __enter__(self): self.start() return self def __exit__(self, type, value, traceback): self.stop()
[docs] def start(self): """ Start capturing the stream data. """ self.capturedtext = "" # Save a copy of the stream: self.streamfd = os.dup(self.origstreamfd) # Replace the original stream with our write pipe: os.dup2(self.pipe_in, self.origstreamfd) if self.threaded: # Start thread that will read the stream: self.workerThread = threading.Thread(target=self.readOutput) self.workerThread.start() # Make sure that the thread is running and os.read() has executed: time.sleep(0.01)
[docs] def stop(self): """ Stop capturing the stream data and save the text in `capturedtext`. """ # Print the escape character to make the readOutput method stop: self.origstream.write(self.escape_char) # Flush the stream to make sure all our data goes in before # the escape character: self.origstream.flush() if self.threaded: # wait until the thread finishes so we are sure that # we have until the last character: self.workerThread.join() else: self.readOutput() # Close the pipe: os.close(self.pipe_in) os.close(self.pipe_out) # Restore the original stream: os.dup2(self.streamfd, self.origstreamfd) # Close the duplicate stream: os.close(self.streamfd)
[docs] def readOutput(self): """ Read the stream data (one byte at a time) and save the text in `capturedtext`. """ while True: char = os.read(self.pipe_out, 1).decode(self.origstream.encoding) if not char or self.escape_char in char: break self.capturedtext += char
# In addition to the normal logger, two logger-like printing facilities are # defined: "job" and "printer". "job" send output to the main job.out file for # the job, and should be used very sparingly, typically to echo what this step # will do in the initial summary of the job. # # "printer" sends output to the file "step.out" in this steps working # directory, and is used for all normal output from this step. logger = logging.getLogger(__name__) job = printing.getPrinter() printer = printing.getPrinter("QuickMin")
[docs] class QuickMin(seamm.Node): """ The non-graphical part of a QuickMin step in a flowchart. Parameters ---------- parser : configargparse.ArgParser The parser object. options : tuple It contains a two item tuple containing the populated namespace and the list of remaining argument strings. subflowchart : seamm.Flowchart A SEAMM Flowchart object that represents a subflowchart, if needed. parameters : QuickMinParameters The control parameters for QuickMin. See Also -------- TkQuickMin, QuickMin, QuickMinParameters """ def __init__(self, flowchart=None, title="QuickMin", extension=None, logger=logger): """A step for QuickMin in a SEAMM flowchart. You may wish to change the title above, which is the string displayed in the box representing the step in the flowchart. Parameters ---------- flowchart: seamm.Flowchart The non-graphical flowchart that contains this step. title: str The name displayed in the flowchart. extension: None Not yet implemented logger : Logger = logger The logger to use and pass to parent classes Returns ------- None """ logger.debug(f"Creating QuickMin {self}") super().__init__( flowchart=flowchart, title="QuickMin", extension=extension, module=__name__, logger=logger, ) # yapf: disable self._metadata = quickmin_step.metadata self.parameters = quickmin_step.QuickMinParameters() @property def version(self): """The semantic version of this module.""" return quickmin_step.__version__ @property def git_revision(self): """The git version of this module.""" return quickmin_step.__git_revision__
[docs] def description_text(self, P=None): """Create the text description of what this step will do. The dictionary of control values is passed in as P so that the code can test values, etc. Parameters ---------- P: dict An optional dictionary of the current values of the control parameters. Returns ------- str A description of the current step. """ if P is None: P = self.parameters.values_to_dict() calculation = P["calculation"] n_steps = P["n_steps"] forcefield = P["forcefield"] if forcefield == "best available": ff_name = "the best available forcefield" else: ff_name = forcefield.split()[0] if calculation == "optimization": text = f"Minimizing the structure with {ff_name}, with a maximum of " text += f"{n_steps} steps." if P["forcefield"] == "best available": kwargs = {} else: kwargs = {"forcefield": ff_name} text += seamm.standard_parameters.structure_handling_description( P, **kwargs ) else: text = f"Performing a quick energy calculation with {ff_name}." return self.header + "\n" + __(text, indent=4 * " ").__str__()
[docs] def run(self): """Run a QuickMin step. Parameters ---------- None Returns ------- seamm.Node The next node object in the flowchart. """ global OpenBabel_version next_node = super().run(printer) # Get the values of the parameters, dereferencing any variables P = self.parameters.current_values_to_dict( context=seamm.flowchart_variables._data ) calculation = P["calculation"] # Print what we are doing printer.important(__(self.description_text(P))) # Add the citations for Open Babel self.references.cite( raw=self._bibliography["openbabel"], alias="openbabel_jcinf", module="quickmin_step", level=1, note="The principle Open Babel citation.", ) # See if we can get the version of obabel if OpenBabel_version is None: path = shutil.which("obabel") if path is not None: path = Path(path).expanduser().resolve() try: result = subprocess.run( [str(path), "--version"], stdin=subprocess.DEVNULL, capture_output=True, text=True, ) except Exception: OpenBabel_version = "unknown" else: OpenBabel_version = "unknown" lines = result.stdout.splitlines() for line in lines: line = line.strip() tmp = line.split() if len(tmp) == 9 and tmp[0] == "Open": OpenBabel_version = { "version": tmp[2], "month": tmp[4], "year": tmp[6], } break if isinstance(OpenBabel_version, dict): try: template = string.Template(self._bibliography["obabel"]) citation = template.substitute( month=OpenBabel_version["month"], version=OpenBabel_version["version"], year=OpenBabel_version["year"], ) self.references.cite( raw=citation, alias="obabel-exe", module="quickmin_step", level=1, note="The principle citation for the Open Babel executables.", ) except Exception: pass # Add the citations for Open Babel self.references.cite( raw=self._bibliography["bs93"], alias="PATTY", module="quickmin_step", level=1, note="The atom typer in OpenBabel.", ) directory = Path(self.directory) directory.mkdir(parents=True, exist_ok=True) # Get the current system and configuration (ignoring the system...) system, configuration = self.get_system_configuration(None) obmol = configuration.to_OBMol() gradients = [] if P["forcefield"] == "best available": for ff_name in ("GAFF", "MMFF94s", "Ghemical", "UFF"): obFF = openbabel.OBForceField.FindForceField(ff_name) if obFF is None: self.logger.warning(f"Couldn't find forcefield '{ff_name}'") continue obFF.SetLogToStdErr() obFF.SetLogLevel(1) # see https://stackoverflow.com/questions/50978464/redirect-logs-to-file-in-pybel # noqa: E501 out = OutputGrabber(sys.stderr) with out: if not obFF.Setup(obmol): if ff_name == "UFF": raise RuntimeError( "Could not find a forcefield for the molecule" ) continue if calculation == "optimization": obFF.ConjugateGradients(P["n_steps"]) energy = obFF.Energy(True) units = obFF.GetUnit() # Capture the gradients. These appear to be forces, so negate factor = -Q_(1.0, units).m_as("kJ/mol") for atom in openbabel.OBMolAtomIter(obmol): # vector objects have to be de-referenced individually (sigh) grad = obFF.GetGradient(atom) gradients.append( [ factor * grad.GetX(), factor * grad.GetY(), factor * grad.GetZ(), ] ) if calculation == "optimization": path = Path(self.directory) / "min.out" else: path = Path(self.directory) / "energy.out" path.write_text(out.capturedtext) break else: ff_name = P["forcefield"].split()[0] obFF = openbabel.OBForceField.FindForceField(ff_name) if obFF is None: raise RuntimeError(f"Couldn't find forcefield '{ff_name}'") obFF.SetLogToStdErr() obFF.SetLogLevel(1) # see https://stackoverflow.com/questions/50978464/redirect-logs-to-file-in-pybel # noqa: E501 out = OutputGrabber(sys.stderr) with out: if not obFF.Setup(obmol): raise RuntimeError( f"Could not assign forcefield {ff_name} to the molecule" ) if calculation == "optimization": obFF.ConjugateGradients(P["n_steps"]) obFF.GetCoordinates(obmol) energy = obFF.Energy(True) units = obFF.GetUnit() # Capture the gradients. These appear to be forces, so negate factor = -Q_(1.0, units).m_as("kJ/mol") for atom in openbabel.OBMolAtomIter(obmol): # vector objects have to be de-referenced individually (sigh) grad = obFF.GetGradient(atom) gradients.append( [ factor * grad.GetX(), factor * grad.GetY(), factor * grad.GetZ(), ] ) if calculation == "optimization": path = Path(self.directory) / "min.out" else: path = Path(self.directory) / "energy.out" path.write_text(out.capturedtext) # Set the model chemistry to the forcefield name. self._model = ff_name # Check for convergence lines = out.capturedtext.splitlines() tmp = lines[-2].split() if len(tmp) == 3: n_iterations = tmp[0] else: n_iterations = "unknown" converged = "HAS CONVERGED" in lines[-1] # Set up the results data data = {} if units == "kJ/mol": data["energy"] = energy data["gradients"] = gradients else: data["energy"] = Q_(energy, units).m_as("kJ/mol") tmp = np.array(gradients) * Q_(1.0, units).m_as("kJ/mol") data["gradients"] = tmp.tolist() data["forcefield"] = ff_name data["model"] = self.model if calculation == "optimization": if converged: text = ( f"The minimization using {ff_name} converged in {n_iterations} " f"steps to {energy:.3f} {units}. " ) else: text = ( f"The minimization with {ff_name} did not converge in " f"{n_iterations} steps! The final energy was {energy:.3f} {units}. " ) else: text = f"Calculated the energy and gradients using {ff_name}" # Put any requested results into variables or tables self.store_results( configuration=configuration, data=data, ) # Save the structure system, configuration = self.get_system_configuration(P) configuration.coordinates_from_OBMol(obmol) text += seamm.standard_parameters.set_names( system, configuration, P, _first=True, forcefield=ff_name ) printer.normal(__(text, indent=4 * " ")) printer.normal("") # Add the citation(s) for the forcefield if "MMFF94" in ff_name: self.references.cite( raw=self._bibliography["MMFF94-1"], alias="MMFF94-1", module="quickmin_step", level=1, note="The main MMFF94 citation.", ) if ff_name == "MMFF94s": self.references.cite( raw=self._bibliography["MMFF94s"], alias="MMFF94s", module="quickmin_step", level=1, note="The main MMFF94, static version, citation.", ) # The remaining MMF94 citations for i in range(2, 6): citation = f"MMFF94-{i}" self.references.cite( raw=self._bibliography[citation], alias=citation, module="quickmin_step", level=2, note=f"The MMFF94 citation #{i}.", ) else: self.references.cite( raw=self._bibliography[ff_name], alias=ff_name, module="quickmin_step", level=1, note=f"The main {ff_name} citation.", ) # Add other citations here or in the appropriate place in the code. # Add the bibtex to data/references.bib, and add a self.reference.cite # similar to the above to actually add the citation to the references. return next_node