# -*- coding: utf-8 -*-
"""Non-graphical part of the DFTB+ step in a SEAMM flowchart
"""
import configparser
import copy
import logging
from pathlib import Path
import pprint
import shutil
import traceback
import pandas
import cms_plots
import dftbplus_step
from .dftbplus import deep_merge, dict_to_hsd, parse_gen_file
from molsystem.elements import to_symbols
import seamm
import seamm_exec
import seamm_util.printing as printing
# In addition to the normal logger, two logger-like printing facilities are
# defined: 'job' and 'printer'. 'job' send output to the main job.out file for
# the job, and should be used very sparingly, typically to echo what this step
# will do in the initial summary of the job.
#
# 'printer' sends output to the file 'step.out' in this steps working
# directory, and is used for all normal output from this step.
logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("DFTB+")
[docs]
def redimension(values, dimensions):
"""Change the dimensions on values to the new dimensions.
Parameters
----------
values : []
The incoming values
dimensions : [int]
The desired dimensions
Returns
-------
values : [[]...]
The redimensioned values.
"""
# Check that the number of values is OK
nvalues = 1
for dim in dimensions:
nvalues *= dim
if len(values) != nvalues:
raise ValueError(
f"Number of values given, {len(values)}, is not equal to the "
f"dimensions: {dimensions} --> {nvalues} values"
)
try:
_, result = _redimension_helper(values, dimensions)
except Exception as e:
print(f"Exception in redimension {type(e)}: {e}")
print(traceback.format_exc())
raise
return result
def _redimension_helper(values, dimensions):
dim = dimensions[-1]
remaining = dimensions[0:-1]
if len(remaining) == 0:
return values[dim:], values[0:dim]
else:
result = []
for i in range(dim):
values, tmp = _redimension_helper(values, remaining)
result.append(tmp)
return values, result
[docs]
class DftbBase(seamm.Node):
"""A base class for substeps in the DFTB+ step."""
def __init__(
self, flowchart=None, title="Choose Parameters", extension=None, logger=logger
):
self.mapping_from_primitive = None
self.mapping_to_primitive = None
self.results = None # Results of the calculation from the tag file.
super().__init__(flowchart=flowchart, title=title, extension=extension)
@property
def is_runable(self):
"""Indicate whether this not runs or just adds input."""
return True
@property
def model(self):
"""The model (chemistry) used to obtain results."""
return self.parent.model
@model.setter
def model(self, value):
self.parent.model = value
@property
def exe_config(self):
"""The configuration for the DFTB+ executable."""
return self.parent.exe_config
[docs]
def band_structure(
self, input_path, sym_points, sym_names, Efermi=[0.0, 0.0], DOS=None
):
"""Prepare the graph for the band structure.
Parameters
----------
path : filename or pathlib.Path
The path to the band output from DFTB+.
"""
Band_Structure = self.create_band_structure_data( # noqa: F841
input_path, sym_points, sym_names, Efermi=Efermi
)
figure = cms_plots.band_structure(
Band_Structure, DOS=DOS, template="band_structure.graph_template"
)
# Write it out.
wd = Path(self.directory)
figure.dump(wd / "band_structure.graph")
options = self.parent.options
write_html = "html" in options and options["html"]
if write_html:
figure.template = "band_structure.html_template"
figure.dump(wd / "band_structure.html")
[docs]
def create_band_structure_data(
self, input_path, sym_points, sym_names, Efermi=[0.0, 0.0]
):
"""Massage the band structure data into a standard form
Parameters
----------
path : filename or pathlib.Path
The path to the band output from DFTB+.
"""
wd = Path(self.directory)
logger.info(f"Preparing the band structure, {wd}")
seamm_options = self.parent.global_options
spin_polarized = len(Efermi) == 2
# Read configuration file for DFTB+
ini_dir = Path(seamm_options["root"]).expanduser()
full_config = configparser.ConfigParser()
full_config.read(ini_dir / "dftbplus.ini")
executor = self.parent.flowchart.executor
if spin_polarized:
cmd = ["dp_bands", "-s", input_path, "band"]
else:
cmd = ["dp_bands", input_path, "band"]
result = executor.run(
cmd=cmd,
config=self.exe_config,
directory=self.directory,
files={},
return_files=["*"],
in_situ=True,
shell=True,
)
if result is None:
logger.error("There was an error running the DOS code")
return None
if spin_polarized:
# Read the spin-up data
with open(wd / "band_s1.dat", "r") as fd:
BandStructure = pandas.read_csv(
fd,
sep=r"\s+",
header=None,
index_col=0,
comment="!",
)
mapper = {}
i = 0
for column in BandStructure.columns:
i += 1
mapper[column] = f"↑ {i}"
BandStructure[column] -= Efermi[0]
BandStructure.rename(columns=mapper, inplace=True)
# Read the spin-up data
with open(wd / "band_s2.dat", "r") as fd:
data = pandas.read_csv(
fd,
sep=r"\s+",
header=None,
index_col=0,
comment="!",
)
i = 0
for column in data.columns:
i += 1
label = f"↓ {i}"
BandStructure[label] = data[column] - Efermi[1]
else:
# Read the plot data
with open(wd / "band_tot.dat", "r") as fd:
BandStructure = pandas.read_csv(
fd,
sep=r"\s+",
header=None,
index_col=0,
comment="!",
)
mapper = {}
i = 0
for column in BandStructure.columns:
i += 1
mapper[column] = f"{i}"
BandStructure[column] -= Efermi[0]
BandStructure.rename(columns=mapper, inplace=True)
# Insert a column of labels
nrows = BandStructure.index.size
labels = [""] * nrows
for pt, label in zip(sym_points, sym_names):
labels[pt - 1] = label
BandStructure.insert(0, "labels", labels)
# And labels for each point in the path
BandStructure.insert(1, "points", self.path)
BandStructure.to_csv(Path(self.directory) / "BandStructure.csv")
return BandStructure
[docs]
def create_dos_data(self, input_path, Efermi=[0.0]):
"""Create the DOS data
Parameters
----------
input_path : filename or pathlib.Path
The path to the band output from DFTB+.
Efermi : float
The Fermi energy in eV
"""
spin_polarized = len(Efermi) == 2
if spin_polarized and Efermi[0] != Efermi[1]:
raise NotImplementedError(
f"Cannot handle different Fermi energies yet: {Efermi}"
)
logger.info("Preparing DOS")
# Total DOS
executor = self.parent.flowchart.executor
result = executor.run(
cmd=[
"dp_dos",
str(input_path),
"dos_total.dat",
],
config=self.exe_config,
directory=self.directory,
files={},
return_files=["*"],
in_situ=True,
shell=True,
)
if result is None:
logger.error("There was an error running the DOS code")
return None
# Read the total DOS data
wd = Path(self.directory)
with open(wd / "dos_total.dat", "r") as fd:
DOS = pandas.read_csv(
fd,
sep=r"\s+",
header=None,
comment="!",
index_col=0,
)
n_columns = DOS.shape[1]
if n_columns == 1:
if spin_polarized:
raise RuntimeError(
"Calculation is spin-polarized but total DOS is not."
)
DOS.rename(columns={0: "E", 1: "Total"}, inplace=True)
elif n_columns == 3:
if not spin_polarized:
raise RuntimeError(
"Calculation is not spin-polarized but total DOS is."
)
DOS.rename(
columns={0: "E", 1: "Total", 2: "Total ↑", 3: "Total ↓"},
inplace=True,
)
else:
raise RuntimeError(f"The total DOS has {n_columns} columns of data.")
# Partial DOS convention is "pdos_{element}.{shell no}.out"
# Figure out the elements and files for each.
files = {}
for path in sorted(wd.glob("pdos*.out")):
element = path.stem.split("_")[1].split(".")[0]
if element not in files:
files[element] = [path]
else:
files[element].append(path)
# Process each element, accumulating the total
for element, paths in files.items():
if spin_polarized:
total_up = None
total_down = None
else:
total = None
for path in paths:
out = path.with_suffix(".dat")
shell_no = int(path.suffixes[0][1:])
shell = ("s", "p", "d", "f")[shell_no - 1]
label = element + "_" + shell
result = executor.run(
cmd=[
"dp_dos",
"-w",
str(path),
str(out),
],
config=self.exe_config,
directory=self.directory,
files={},
return_files=["*"],
in_situ=True,
shell=True,
)
if result is None:
logger.error("There was an error running the DOS code")
return None
# Read the plot data
with open(out, "r") as fd:
data = pandas.read_csv(
fd,
sep=r"\s+",
header=None,
comment="!",
index_col=0,
)
n_columns = data.shape[1]
if n_columns == 1:
if spin_polarized:
raise RuntimeError(
f"Calculation is spin-polarized but {path} is not."
)
data.rename(columns={0: "E", 1: label}, inplace=True)
# Check the bounds. Sometimes rounding causes different grid sizes.
tmp = data.index.isin(DOS.index)
if not tmp.all():
data = data.loc[tmp]
tmp = DOS.index.isin(data.index)
if not tmp.all():
DOS = DOS.loc[tmp]
if total is not None:
total = total.loc[tmp]
if total is None:
total = data[label]
else:
total += data[label]
if not DOS.index.equals(data.index):
raise RuntimeError(
f"The energy values for partial DOS {label} are different!"
f" ({out})"
)
DOS[label] = data[label].array
elif n_columns == 3:
if not spin_polarized:
raise RuntimeError(
f"Calculation is not spin-polarized but {path} is."
)
data.rename(
columns={0: "E", 1: "Total", 2: "Up", 3: "Down"}, inplace=True
)
# Check the bounds.
tmp = data.index.isin(DOS.index)
if not tmp.all():
data = data.loc[tmp]
tmp = DOS.index.isin(data.index)
if not tmp.all():
DOS = DOS.loc[tmp]
if total_up is not None:
total_up = total_up.loc[tmp]
total_down = total_down.loc[tmp]
if total_up is None:
total_up = data["Up"]
total_down = data["Down"]
else:
total_up += data["Up"]
total_down += data["Down"]
if not DOS.index.equals(data.index):
raise RuntimeError(
f"The energy values for partial DOS {label} are different!"
f" ({out})"
)
DOS[label + " ↑"] = data["Up"].array
DOS[label + " ↓"] = data["Down"].array
else:
raise RuntimeError(f"The {path} has {n_columns} columns of data.")
if spin_polarized:
DOS[element + " ↑"] = total_up.array
DOS[element + " ↓"] = total_down.array
else:
DOS[element] = total.array
# Shift the Fermi level to 0
DOS.index -= Efermi[0]
DOS.to_csv(Path(self.directory) / "DOS.csv")
return DOS
[docs]
def dos(self, input_path, Efermi=[0.0]):
"""Prepare the graph for the density of states.
Parameters
----------
input_path : filename or pathlib.Path
The path to the band output from DFTB+.
Efermi : float
The Fermi energy in eV
"""
DOS = self.create_dos_data(input_path, Efermi=Efermi)
figure = cms_plots.dos(DOS, template="line.graph_template")
# Write it out.
figure.dump(Path(self.directory) / "DOS.graph")
options = self.parent.options
write_html = "html" in options and options["html"]
if write_html:
figure.template = "line.html_template"
figure.dump(Path(self.directory) / "DOS.html")
[docs]
def geometry(self):
"""Create the input for DFTB+ for the geometry.
Example::
Geometry = {
TypeNames = { "Ga" "As" }
TypesAndCoordinates [Angstrom] = {
1 0.000000 0.000000 0.000000
2 1.356773 1.356773 1.356773
}
Periodic = Yes
LatticeVectors [Angstrom] = {
2.713546 2.713546 0.
0. 2.713546 2.713546
2.713546 0. 2.713546
}
}
"""
_, configuration = self.get_system_configuration(None)
result = "Geometry = {\n"
elements = set(configuration.atoms.symbols)
elements = sorted([*elements])
names = '{"' + '" "'.join(elements) + '"}'
result += f" TypeNames = {names}\n"
if configuration.periodicity == 0:
result += " TypesAndCoordinates [Angstrom] = {\n"
for element, xyz in zip(
configuration.atoms.symbols,
configuration.atoms.get_coordinates(fractionals=False),
):
index = elements.index(element)
x, y, z = xyz
result += f" {index+1:>2} {x:10.6f} {y:10.6f} {z:10.6f}\n"
result += " }\n"
# The reference energy, if available
if self.parent._reference_energies is None:
self.parent._reference_energy = None
else:
energy = 0.0
for el in configuration.atoms.symbols:
energy += self.parent._reference_energies[el]
self.parent._reference_energy = energy
elif configuration.periodicity == 3:
if "primitive cell" in self.parameters:
use_primitive_cell = self.parameters["primitive cell"].get(
context=seamm.flowchart_variables._data
)
else:
use_primitive_cell = True
if use_primitive_cell:
# Write the structure using the primitive cell
(
lattice,
fractionals,
atomic_numbers,
self.mapping_from_primitive,
self.mapping_to_primitive,
) = configuration.primitive_cell()
else:
# Use the full cell
lattice = configuration.cell.vectors()
fractionals = configuration.atoms.get_coordinates(fractionals=True)
atomic_numbers = configuration.atoms.atomic_numbers
n_atoms = len(atomic_numbers)
self.mapping_from_primitive = [i for i in range(n_atoms)]
self.mapping_to_primitive = [i for i in range(n_atoms)]
symbols = to_symbols(atomic_numbers)
result += " Periodic = Yes\n"
result += " LatticeVectors [Angstrom] = {\n"
for xyz in lattice:
x, y, z = xyz
result += f" {x:15.9f} {y:15.9f} {z:15.9f}\n"
result += " }\n"
result += " TypesAndCoordinates [relative] = {\n"
for element, xyz in zip(symbols, fractionals):
index = elements.index(element)
x, y, z = xyz
result += f" {index+1:>2} {x:15.9f} {y:15.9f} {z:15.9f}\n"
result += " }\n"
# The reference energy, if available
if self.parent._reference_energies is None:
self.parent._reference_energy = None
else:
energy = 0.0
for el in symbols:
energy += self.parent._reference_energies[el]
self.parent._reference_energy = energy
result += "}\n"
return result
[docs]
def find_previous_step(self, cls, missing_ok=False):
"""Find the previous step of class 'cls'
Parameters
----------
cls : class
The class of the desired step.
missing_ok : bool = False
Don't raise an error, but return None if no step found
Returns
-------
seamm.Node
The node if found, None if not.
"""
result = None
for step in self.parent._steps[::-1]:
if isinstance(step, cls):
result = step
break
return result
[docs]
def get_previous_charges(self, missing_ok=False):
"""Copy charges from the previous energy step."""
directory = Path(self.directory)
step = self.find_previous_step(dftbplus_step.Energy, missing_ok=missing_ok)
if step is None:
if missing_ok:
return None
raise RuntimeError("Previous energy/optimization step not found.")
from_path = Path(step.directory) / "charges.dat"
if from_path.exists():
to_path = directory / "charges.dat"
if not to_path.exists() or not to_path.samefile(from_path):
shutil.copy2(from_path, to_path)
else:
if missing_ok:
return None
raise FileNotFoundError("No 'charges.dat' in previous energy step.")
return step
[docs]
def parse_results(self, lines):
"""Digest the data in the results.tag file."""
properties = dftbplus_step.metadata["results"]
property_data = {}
line_iter = enumerate(lines.splitlines(), start=1)
try:
while True:
lineno, line = next(line_iter)
if len(line) == 0 or line[0] == "#":
continue
if ":" not in line:
raise RuntimeError(
f"Problem parsing the results.tag file: {lineno}: " f"{line}"
)
key, _type, ndims, rest = line.split(":", maxsplit=3)
ndims = int(ndims)
key = key.strip()
if ndims == 0:
# scalar
lineno, line = next(line_iter)
if _type == "real":
property_data[key] = float(line)
else:
property_data[key] = line
else:
dims = [int(x) for x in rest.split(",")]
n_to_read = 1
for dim in dims:
n_to_read *= dim
values = []
while len(values) < n_to_read:
lineno, line = next(line_iter)
values.extend(line.strip().split())
if _type == "real":
values = [float(x) for x in values]
if key == "fermi_level":
# The Fermi level has one or two values if spin-polarized, but
# normally they are the same, so turn into a scalar.
property_data[key] = values[0]
else:
property_data[key] = redimension(values, dims)
if key not in properties:
self.logger.warning("Property '{}' not recognized.".format(key))
if key in properties and "units" in properties[key]:
property_data[key + ",units"] = properties[key]["units"]
except StopIteration:
pass
return property_data
[docs]
def run(self, current_input):
"""Run a DFTB+ step.
Parameters
----------
None
Returns
-------
seamm.Node
The next node object in the flowchart.
"""
system, configuration = self.get_system_configuration(None)
P = self.parameters.current_values_to_dict(
context=seamm.flowchart_variables._data
)
directory = Path(self.directory)
directory.mkdir(parents=True, exist_ok=True)
# Access the options
options = self.parent.options
seamm_options = self.parent.global_options
# Get the geometry first, because this sets up the primitive cell if needed
geom = self.geometry()
input_data = copy.deepcopy(current_input)
result = self.get_input()
deep_merge(input_data, result)
hsd = dict_to_hsd(input_data)
hsd += geom
# The header part of the output
for value in self.description:
printer.important(value)
# Check for successful run, don't rerun
success = directory / "success.dat"
if not success.exists():
files = {"dftb_in.hsd": hsd}
logger.info("dftb_in.hsd:\n" + files["dftb_in.hsd"])
# If the charge file exists, use it!
path = directory / "charges.dat"
if path.exists():
files["charges.dat"] = path.read_text()
# Write the input files to the current directory
for filename in files:
path = directory / filename
with path.open(mode="w") as fd:
fd.write(files[filename])
if not P["input only"]:
# Get the computational environment and set limits
ce = seamm_exec.computational_environment()
n_cores = ce["NTASKS"]
if seamm_options["ncores"] != "available":
n_cores = min(n_cores, int(seamm_options["ncores"]))
if options["use_openmp"]:
n_atoms = configuration.n_atoms
n_atoms_per_core = int(options["natoms_per_core"])
n_threads = int(round(n_atoms / n_atoms_per_core))
if n_threads > n_cores:
n_threads = n_cores
elif n_threads < 1:
n_threads = 1
else:
n_threads = 1
printer.important(
f" DFTB+ using {n_threads} OpenMP threads for {n_atoms} "
"atoms.\n"
)
env = {
"OMP_NUM_THREADS": str(n_threads),
}
return_files = [
"*.out",
"charges.*",
"dftb_pin.hsd",
"geom.out.*",
"output",
"pdos*",
"results.tag",
"*.xml",
"eigenvec.bin",
]
# Run the calculation
executor = self.parent.flowchart.executor
result = executor.run(
cmd=["{code}", ">", "DFTB+.out", "2>", "stderr.txt"],
config=self.exe_config,
directory=self.directory,
files=files,
return_files=return_files,
in_situ=True,
shell=True,
env=env,
)
if not result:
logger.error("There was an error running DFTB+")
return None
logger.debug("\n" + pprint.pformat(result))
if not P["input only"]:
# Parse the results.tag file
path = directory / "results.tag"
if path.exists():
data = path.read_text()
self.results = self.parse_results(data)
else:
self.results = {}
# And a final structure
path = directory / "geom.out.gen"
if path.exists():
data = path.read_text()
self.results["final structure"] = parse_gen_file(data)
# Analyze the results
self.analyze(data=self.results)
printer.important(" ")
# Ran successfully, put out the success file
success.write_text("success")