# -*- coding: utf-8 -*-
"""Setup and run Psi4"""
import json
import logging
from pathlib import Path
import cclib
from molsystem import RMSD
import psi4_step
import seamm
import seamm.data
from seamm_util import units_class
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __
try:
from itertools import batched
except ImportError:
from itertools import islice
def batched(iterable, n):
"Batch data into tuples of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError("n must be at least one")
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("psi4")
[docs]
class Optimization(psi4_step.Energy):
def __init__(
self,
flowchart=None,
title="Optimization",
extension=None,
logger=logger,
):
"""Initialize the node"""
logger.debug("Creating Optimization {}".format(self))
super().__init__(flowchart=flowchart, title=title, extension=extension)
self._calculation = "optimization"
self._model = None
self._metadata = psi4_step.metadata
self.parameters = psi4_step.OptimizationParameters()
self.description = "A geometry optimization"
[docs]
def analyze(self, indent="", data=None, out=[], table=None):
"""Parse the output and generating the text output and store the
data in variables for other stages to access
"""
directory = Path(self.directory)
text = ""
# Get the results
if data is None:
# Use cclib to get results. Note the file is one level up.
path = directory.parent / "output.dat"
if path.exists():
data = vars(cclib.io.ccread(path))
data = self.process_data(data)
else:
data = {}
# Read in the results from json
directory = Path(self.directory)
json_file = directory / "properties.json"
if not json_file.exists():
data = {}
tmp = str(json_file)
text = (
"\nThere are no results from Psi4. Perhaps it "
f"failed? Looking for {tmp}."
)
printer.normal(__(text, **data, indent=self.indent + 4 * " "))
raise RuntimeError(text)
with json_file.open() as fd:
tmp = json.load(fd)
data.update(**tmp)
# Get the structure, if changed
method, functional, extended_functional, method_string = self.get_method()
P = self.parameters.current_values_to_dict(
context=seamm.flowchart_variables._data
)
_, initial_configuration = self.get_system_configuration()
system, configuration = self.get_system_configuration(P)
update_structure = P["structure handling"] != "Discard the structure"
initial = initial_configuration.to_RDKMol()
if update_structure:
final = configuration.to_RDKMol()
else:
final = initial_configuration.to_RDKMol()
# Get the updated coordinates
structure_file = directory / "final_structure.json"
if structure_file.exists():
with structure_file.open(mode="r") as fd:
structure = json.load(fd)
if "geom" in structure:
XYZs = [batched(structure["geom"], 3)]
final.GetConformer(0).SetPositions(XYZs)
result = RMSD(final, initial, symmetry=True, include_h=True, align=True)
data["RMSD with H"] = result["RMSD"]
data["displaced atom with H"] = result["displaced atom"]
data["maximum displacement with H"] = result["maximum displacement"]
# Align the structure
if update_structure:
configuration.from_RDKMol(final)
# And the name of the configuration.
text = seamm.standard_parameters.set_names(
system,
configuration,
P,
_first=True,
model=self.parent.model,
)
printer.normal(__(text, **data, indent=self.indent + 4 * " "))
printer.normal("")
result = RMSD(final, initial, symmetry=True)
data["RMSD"] = result["RMSD"]
data["displaced atom"] = result["displaced atom"]
data["maximum displacement"] = result["maximum displacement"]
# Create the optimization part of the output
if table is None:
table = {
"Property": [],
"Value": [],
"Units": [],
}
if "optdone" in data:
table["Property"].append("Converged")
table["Value"].append(str(data["optdone"]))
table["Units"].append("")
else:
table["Property"].append("Converged")
table["Value"].append("unknown")
table["Units"].append("")
if "optstatus" in data:
table["Property"].append("# Steps")
table["Value"].append(str(len(data["optstatus"])))
table["Units"].append("")
else:
table["Property"].append("# Steps")
table["Value"].append("unknown")
table["Units"].append("")
if "RMSD" in data:
tmp = data["RMSD"]
table["Property"].append("RMSD in Geometry")
table["Value"].append(f"{tmp:.2f}")
table["Units"].append("Å")
if "maximum displacement" in data:
tmp = data["maximum displacement"]
table["Property"].append("Largest Displacement")
table["Value"].append(f"{tmp:.2f}")
table["Units"].append("Å")
if "displaced atom" in data:
tmp = data["displaced atom"]
table["Property"].append("Displaced Atom")
table["Value"].append(f"{tmp + 1}")
table["Units"].append("")
super().analyze(indent=indent, data=data, out=out, table=table)
[docs]
def description_text(
self,
P=None,
calculation_type="Geometry optimization",
configuration=None,
):
"""Prepare information about what this node will do"""
if not P:
P = self.parameters.values_to_dict()
text = super().description_text(
P=P, calculation_type=calculation_type, configuration=configuration
)
added = "\nThe geometry optimization will use the {optimization method} "
if P["max geometry steps"] == "default":
added += "method, using the default maximum number of steps, which"
added += " is based on the system size."
else:
added += "method, with no more than {max geometry steps} steps."
if P["geometry convergence"] == "Custom":
added += " The convergence criterion is"
else:
added += " The convergence criterion is '{geometry convergence}'."
if P["recalc hessian"] != "never":
added += " The Hessian will be recalculated every {recalc hessian}"
added += " steps. Note that calculating the second derivatives is "
added += "quite expensive!"
if self.parent.model is None:
kwargs = {"model": "<model>"}
else:
kwargs = {"model": self.model}
added += seamm.standard_parameters.structure_handling_description(P, **kwargs)
return text + "\n" + __(added, **P, indent=4 * " ").__str__()