Source code for psi4_step.energy
# -*- coding: utf-8 -*-
"""Setup and run Psi4"""
import json
import logging
from pathlib import Path
from openbabel import openbabel
import psi4_step
import seamm
import seamm.data
from seamm_util import units_class
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __
logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("psi4")
[docs]
class Energy(seamm.Node):
def __init__(self, flowchart=None, title="Energy", extension=None, logger=logger):
"""Initialize the node"""
logger.debug("Creating Energy {}".format(self))
super().__init__(
flowchart=flowchart, title=title, extension=extension, logger=logger
)
self._calculation = "energy"
self._model = None
self._metadata = psi4_step.metadata
self.parameters = psi4_step.EnergyParameters()
self.description = "A single point energy calculation"
@property
def header(self):
"""A printable header for this section of output"""
return "Step {}: {}".format(".".join(str(e) for e in self._id), self.title)
@property
def version(self):
"""The semantic version of this module."""
return psi4_step.__version__
@property
def git_revision(self):
"""The git version of this module."""
return psi4_step.__git_revision__
[docs]
def description_text(
self,
P=None,
calculation_type="Single-point energy",
configuration=None,
):
"""Prepare information about what this node will do"""
if P is not None:
P = self.parameters.values_to_dict()
if P["level"] == "recommended":
method = P["method"]
else:
method = P["advanced_method"]
if self.is_expr(method):
text = f"{calculation_type} using method given by {method}."
elif (
method in psi4_step.methods and psi4_step.methods[method]["method"] == "dft"
):
if P["level"] == "recommended":
functional = P["functional"]
else:
functional = P["advanced_functional"]
text = f"{calculation_type} using {method} with an "
text += f"exchange-correlation potential of {functional}"
if (
functional in psi4_step.dft_functionals
and len(psi4_step.dft_functionals[functional]["dispersion"]) > 1
and P["dispersion"] != "none"
):
text += f" with the {P['dispersion']} dispersion correction."
else:
text += " with no dispersion correction."
else:
text = f"{calculation_type} using {method}."
# Spin
if P["spin-restricted"] == "yes":
text += " The spin will be restricted to a pure eigenstate."
elif P["spin-restricted"] == "default":
if configuration is not None:
if configuration.spin_multiplicity == 1:
text += " The spin will be restricted to a pure eigenstate."
else:
text += " The spin will not be restricted and may not be a "
text += "proper eigenstate."
else:
text += " The spin will be restricted to a pure "
text += "eigenstate for singlet states. Otherwise it will not "
text += "be restricted and may not be a proper eigenstate."
elif self.is_expr(P["spin-restricted"]):
text += " Whether the spin will be restricted to a pure "
text += "eigenstate will be determined by {P['spin-restricted']}"
else:
text += " The spin will not be restricted and may not be a "
text += "proper eigenstate."
# Plotting
if isinstance(P["density"], str):
density = P["density"] != "no"
else:
density = P["density"]
if isinstance(P["orbitals"], str):
orbitals = P["orbitals"] != "no"
else:
orbitals = P["orbitals"]
if density:
if orbitals:
text += "\nThe alpha and beta electron, total, and spin densities, "
text += f"and orbitals {P['selected orbitals']} will be plotted."
elif orbitals:
text += f"\nThe orbitals {P['selected orbitals']} will be plotted."
return self.header + "\n" + __(text, **P, indent=4 * " ").__str__()
[docs]
def get_input(self, calculation_type="energy", restart=None):
"""Get the input for an energy calculation for Psi4"""
# Create the directory
directory = Path(self.directory)
directory.mkdir(parents=True, exist_ok=True)
_, configuration = self.get_system_configuration(None)
P = self.parameters.current_values_to_dict(
context=seamm.flowchart_variables._data
)
# Have to fix formatting for printing...
PP = dict(P)
for key in PP:
if isinstance(PP[key], units_class):
PP[key] = "{:~P}".format(PP[key])
self.description = []
self.description.append(
__(
self.description_text(PP, configuration=configuration),
**PP,
indent=self.indent,
)
)
lines = []
if calculation_type == "energy":
lines.append("")
lines.append("#" * 80)
lines.append(f"# {self.header}")
lines.append("#" * 80)
# Figure out what we are doing!
if P["level"] == "recommended":
method_string = P["method"]
else:
method_string = P["advanced_method"]
# Allow the full name, or the short name, or just pray.
if method_string in psi4_step.methods:
method = psi4_step.methods[method_string]["method"]
else:
method = method_string.lower()
for key in psi4_step.methods:
if psi4_step.methods[key]["method"] == method:
break
else:
method = method_string
# lines.append('set scf_type df')
# lines.append('set guess sad')
multiplicity = configuration.spin_multiplicity
spin_restricted = P["spin-restricted"]
if spin_restricted == "default":
if multiplicity == 1:
restricted = True
else:
restricted = False
elif spin_restricted == "yes":
restricted = True
else:
restricted = False
if method == "dft":
if restricted:
lines.append("set reference rks")
else:
lines.append("set reference uks")
else:
if restricted:
if multiplicity == 1:
lines.append("set reference rhf")
else:
lines.append("set reference rohf")
else:
lines.append("set reference uhf")
if "freeze-cores" in P and P["freeze-cores"] == "yes":
lines.append("set freeze_core True")
else:
lines.append("set freeze_core False")
if P["stability analysis"]:
lines.append("set stability_analysis True")
# Convergence parameters and methods
lines.append("set fail_on_maxiter False") # Psi4 hangs! Cope in the plug-in
lines.append(f"set maxiter {P['maximum iterations']}")
if P["density convergence"] != "default":
lines.append(f"set d_convergence {P['density convergence']}")
if P["energy convergence"] != "default":
lines.append(f"set e_convergence {P['energy convergence']}")
if P["use damping"]:
lines.append(f"set damping_percentage {P['damping percentage']}")
lines.append(f"set damping_convergence {P['damping convergence']}")
if P["use level shift"]:
lines.append(f"set level_shift {P['level shift']}")
lines.append(f"set level_shift_cutoff {P['level shift convergence']}")
if P["use soscf"]:
lines.append("set soscf True")
lines.append(
f"set soscf_start_convergence {P['soscf starting convergence']}"
)
lines.append(f"set soscf_conv {P['soscf convergence']}")
lines.append(f"set soscf_max_iter {P['soscf max iterations']}")
if P["soscf print iterations"]:
lines.append("set soscf_print True")
lines.append("")
if method == "dft":
if P["level"] == "recommended":
functional_string = P["functional"]
else:
functional_string = P["advanced_functional"]
# Allow the full name, or the short name, or just pray.
if functional_string in psi4_step.dft_functionals:
functional = psi4_step.dft_functionals[functional_string]["name"]
else:
functional = functional_string.lower()
for key in psi4_step.dft_functionals:
if psi4_step.dft_functionals[key]["name"] == functional:
break
else:
functional = functional_string
if (
P["dispersion"] != "none"
and len(psi4_step.dft_functionals[functional_string]["dispersion"]) > 1
):
functional = functional + "-" + P["dispersion"]
if restart is None:
lines.append(
f"Eelec, wfn = {calculation_type}('{functional}', return_wfn=True)"
)
else:
if calculation_type == "gradient":
lines.append(
f"Eelec, wfn = energy('{functional}', return_wfn=True,"
f" restart_file='{restart}')"
)
lines.append(f"G = gradient('{functional}', ref_wfn=wfn)")
else:
lines.append(
f"Eelec, wfn = {calculation_type}('{functional}', "
f"return_wfn=True, restart_file='{restart}')"
)
else:
if restart is None:
lines.append(
f"Eelec, wfn = {calculation_type}('{method}', return_wfn=True)"
)
else:
if calculation_type == "gradient":
lines.append(
f"Eelec, wfn = energy('{method}', return_wfn=True, "
f" restart_file='{restart}')"
)
lines.append(f"G = gradient('{method}', ref_wfn=wfn)")
else:
lines.append(
f"Eelec, wfn = {calculation_type}('{method}', return_wfn=True, "
f" restart_file='{restart}')"
)
if calculation_type != "gradient":
# Dump the properties to a json file
filename = f"@{self._id[-1]}+properties.json"
lines.append(
f"""
try:
oeprop(
wfn,
"MULTIPOLE(5)",
"ESP_AT_NUCLEI",
"LOWDIN_CHARGES",
"MULLIKEN_CHARGES",
"WIBERG_LOWDIN_INDICES",
"MAYER_INDICES",
"NO_OCCUPATIONS",
title="PROP"
)
except Exception:
print("Failed to calculate properties")
variables = scalar_variables()
variables.update(wfn.scalar_variables())
arrays = array_variables()
for item in arrays:
variables[item] = array_variable(item).np.tolist()
arrays = wfn.array_variables()
for item in arrays:
variables[item] = wfn.array_variable(item).np.tolist()
variables["Eelec"] = Eelec
variables["_method"] = "{method}"
variables["_method_string"] = "{method_string}"
with open("{filename}", "w") as fd:
json.dump(fix_multipoles(variables), fd, sort_keys=True, indent=3)
"""
)
# Orbital plots
lines.append(self.plot_input())
return "\n".join(lines)
[docs]
def analyze(self, indent="", data={}, out=[]):
"""Parse the output and generating the text output and store the
data in variables for other stages to access
"""
# Read in the results from json
directory = Path(self.directory)
json_file = directory / "properties.json"
if json_file.exists():
with json_file.open() as fd:
data = json.load(fd)
# Put any requested results into variables or tables
self.store_results(
data=data,
create_tables=self.parameters["create tables"].get(),
)
text = "The calculated energy is {Eelec:.6f} E_h."
else:
data = {}
tmp = str(json_file)
text = (
"\nThere are no results from Psi4. Perhaps it "
f"failed? Looking for {tmp}."
)
printer.normal(__(text, **data, indent=self.indent + 4 * " "))
raise RuntimeError(text)
# Write the structure locally for use in density and orbital plots
system, configuration = self.get_system_configuration()
obConversion = openbabel.OBConversion()
obConversion.SetOutFormat("sdf")
obMol = configuration.to_OBMol(properties="all")
title = f"SEAMM={system.name}/{configuration.name}"
obMol.SetTitle(title)
sdf = obConversion.WriteString(obMol)
path = directory / "structure.sdf"
path.write_text(sdf)
printer.normal(__(text, **data, indent=self.indent + 4 * " "))
[docs]
def plot_input(self):
"""Generate the input for plotting to cube files."""
_, configuration = self.get_system_configuration(None)
P = self.parameters.current_values_to_dict(
context=seamm.flowchart_variables._data
)
tasks = []
if P["density"]:
tasks.append("density")
orbitals = []
if P["orbitals"]:
tasks.append("orbitals")
# and work out the orbitals
txt = P["selected orbitals"]
if txt == "all":
pass
else:
# Which is the HOMO orbital?
# This will not work with ECPs.
n_electrons = (
sum(configuration.atoms.atomic_numbers) - configuration.charge
)
multiplicity = configuration.spin_multiplicity
homo = (n_electrons - (multiplicity - 1)) // 2 + (multiplicity - 1)
for chunk in txt.split(","):
chunk = chunk.strip()
if ":" in chunk or ".." in chunk:
if ":" in chunk:
first, last = chunk.split(":")
elif ".." in chunk:
first, last = chunk.split("..")
first = first.strip().upper()
last = last.strip().upper()
if first == "HOMO":
first = homo
elif first == "LUMO":
first = homo + 1
else:
first = int(first.removeprefix("HOMO").removeprefix("LUMO"))
if first < 0:
first = homo + first
else:
first = homo + 1 + first
if last == "HOMO":
last = homo
elif last == "LUMO":
last = homo + 1
else:
last = int(last.removeprefix("HOMO").removeprefix("LUMO"))
if last < 0:
last = homo + last
else:
last = homo + 1 + last
orbitals.extend(range(first, last + 1))
else:
first = chunk.strip().upper()
if first == "HOMO":
first = homo
elif first == "LUMO":
first = homo + 1
else:
first = int(first.removeprefix("HOMO").removeprefix("LUMO"))
if first < 0:
first = homo + first
else:
first = homo + 1 + first
orbitals.append(first)
if len(tasks) == 0:
return ""
lines = []
txt = "', '".join(tasks)
lines.append("")
lines.append("# Cube files for density and orbitals")
lines.append(f"set cubeprop_tasks ['{txt}']")
if len(orbitals) > 0:
txt = ", ".join([f"{i}, {-i}" for i in orbitals])
lines.append(f"set cubeprop_orbitals [{txt}]")
lines.append("")
lines.append("cubeprop(wfn)")
lines.append(
f"""
# Prefix the files with the substep number
paths = Path.cwd().glob('*.cube')
for path in paths:
name = path.name
newpath = path.with_name('@{self._id[-1]}+' + name)
path.rename(newpath)
"""
)
return "\n".join(lines)