# -*- coding: utf-8 -*-
"""Non-graphical part of the Read Structure step in a SEAMM flowchart
In addition to the normal logger, two logger-like printing facilities are
defined: 'job' and 'printer'. 'job' send output to the main job.out file for
the job, and should be used very sparingly, typically to echo what this step
will do in the initial summary of the job.
'printer' sends output to the file 'step.out' in this steps working
directory, and is used for all normal output from this step.
"""
import logging
from pathlib import PurePath, Path
import tarfile
import tempfile
import textwrap
from .formats.registries import get_format_metadata
import read_structure_step
from .read import read
import seamm
from seamm_util import ureg, Q_ # noqa: F401
from seamm_util import getParser
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __
from .utils import guess_extension
logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("Read Structure")
[docs]
class ReadStructure(seamm.Node):
def __init__(self, flowchart=None, title="Read Structure", extension=None):
"""A step for Read Structure in a SEAMM flowchart.
You may wish to change the title above, which is the string displayed
in the box representing the step in the flowchart.
Parameters:
flowchart: The flowchart that contains this step.
title: The name displayed in the flowchart.
extension: ??
Returns:
None
"""
logger.debug("Creating Read Structure {}".format(self))
# Set the logging level for this module if requested
# if 'read_structure_step_log_level' in self.options:
# logger.setLevel(self.options.read_structure_step_log_level)
super().__init__(
flowchart=flowchart, title=title, extension=extension, logger=logger
) # yapf: disable
self.parameters = read_structure_step.ReadStructureParameters()
@property
def version(self):
"""The semantic version of this module."""
return read_structure_step.__version__
@property
def git_revision(self):
"""The git version of this module."""
return read_structure_step.__git_revision__
[docs]
def create_parser(self):
"""Setup the command-line / config file parser"""
# Need to mimic MOPAC step to find the MOPAC executable
parser_name = "mopac-step"
parser = getParser()
# Remember if the parser exists ... this type of step may have been
# found before
parser_exists = parser.exists(parser_name)
# Create the standard options, e.g. log-level
result = super().create_parser(name=parser_name)
if parser_exists:
return result
# Options for Mopac
parser.add_argument(
parser_name,
"--mopac-exe",
default="MOPAC2016.exe",
help="the name of the MOPAC executable",
)
parser.add_argument(
parser_name,
"--mopac-path",
default="",
help="the path to the MOPAC executable",
)
parser.add_argument(
parser_name,
"--ncores",
default="default",
help="How many threads to use in MOPAC",
)
parser.add_argument(
parser_name,
"--mkl-num-threads",
default="default",
help="How many threads to use with MKL in MOPAC",
)
parser.add_argument(
parser_name,
"--max-atoms-to-print",
default=25,
help="Maximum number of atoms to print charges, etc.",
)
return result
[docs]
def description_text(self, P=None):
"""Create the text description of what this step will do.
The dictionary of control values is passed in as P so that
the code can test values, etc.
Keyword arguments:
P: An optional dictionary of the current values of the control
parameters.
"""
if not P:
P = self.parameters.values_to_dict()
text = f"Read structure from {P['file']}. "
# What type of file?
extension = ""
if isinstance(P["file"], Path):
filename = str(P["file"])
else:
filename = P["file"].strip()
file_type = P["file type"]
if self.is_expr(filename) or self.is_expr(file_type):
extension = "all"
else:
if file_type != "from extension":
extension = file_type.split()[0]
else:
if self.is_expr(filename):
extension = "all"
elif filename != "":
path = PurePath(filename)
extension = path.suffix
if extension == ".gz":
extension = path.with_suffix("").suffix
# Get the metadata for the format
metadata = get_format_metadata(extension)
if extension == "all" or not metadata["single_structure"]:
text += seamm.standard_parameters.multiple_structure_handling_description(P)
else:
text += seamm.standard_parameters.structure_handling_description(P)
text = textwrap.fill(text, initial_indent=4 * " ", subsequent_indent=4 * " ")
return self.header + "\n" + text
[docs]
def run(self):
"""Run a Read Structure step."""
next_node = super().run(printer)
# Get the values of the parameters, dereferencing any variables
P = self.parameters.current_values_to_dict(
context=seamm.flowchart_variables._data
)
# Check for tar files, potentially compressed
if isinstance(P["file"], Path):
path = P["file"].expanduser().resolve()
else:
path = Path(P["file"].strip()).expanduser().resolve()
extensions = path.suffixes
if ".tar" in extensions or ".tgz" in extensions:
self.read_tarfile(path, P)
else:
# What type of file?
filename = str(path)
file_type = P["file type"]
if file_type != "from extension":
extension = file_type.split()[0]
else:
extension = path.suffix
if extension == ".gz":
extension = path.with_suffix("").suffix
if extension == "":
extension = guess_extension(filename, use_file_name=False)
P["file type"] = extension
# Print what we are doing
printer.important(self.description_text(P))
# Read the file into the system
system_db = self.get_variable("_system_db")
system, configuration = self.get_system_configuration(P, same_as=None)
configurations = read(
filename,
configuration,
extension=extension,
add_hydrogens=P["add hydrogens"],
system_db=system_db,
system=system,
indices=P["indices"],
subsequent_as_configurations=(
P["subsequent structure handling"] == "Create a new configuration"
),
system_name=str(P["system name"]),
configuration_name=str(P["configuration name"]),
printer=printer.important,
references=self.references,
bibliography=self._bibliography,
)
# Finish the output
system, configuration = self.get_system_configuration()
if configurations is None or len(configurations) == 1:
if configuration.periodicity == 3:
space_group = configuration.symmetry.group
if space_group == "":
symmetry_info = ""
else:
symmetry_info = f" The space group is {space_group}."
printer.important(
__(
"\n Created a periodic structure with "
f"{configuration.n_atoms} atoms. {symmetry_info}"
f"\n System name = {system.name}"
f"\n Configuration name = {configuration.name}",
indent=4 * " ",
)
)
else:
printer.important(
__(
"\n Created a molecular structure with "
f"{configuration.n_atoms} atoms."
f"\n System name = {system.name}"
f"\n Configuration name = {configuration.name}",
indent=4 * " ",
)
)
printer.important("")
return next_node
[docs]
def read_tarfile(self, tarfile_path, P):
"""Read structures from a tarfile.
Parameters
----------
path : pathlib.Path
The path to the tarfile.
P : {str: str}
Dictionary of control parameters for this step.
"""
file_type = P["file type"]
if file_type != "from extension":
extensions = [file_type.split()[0]]
as_configurations = (
P["subsequent structure handling"] == "Create a new configuration"
)
n = 0
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir_path = Path(tmp_dir)
with tarfile.open(tarfile_path.expanduser(), "r") as tar:
for member in tar:
if not member.isfile():
continue
if member.name[0] == ".":
continue
path = PurePath(member.name)
if path.name[0] == ".":
continue
extension = path.suffix
# If explicit extension does not match, skip.
if file_type != "from extension" and extension not in extensions:
continue
# For the time being write the contents to a file. Eventually should
# rewrite all the routines to handle text as well as files.
fd = tar.extractfile(member)
if fd is None:
fd.close()
continue
data = fd.read()
fd.close()
tmp_path = tmp_dir_path / path.name
tmp_path.write_bytes(data)
filename = str(tmp_path)
if extension == "":
extension = guess_extension(filename)
# Read the file into the system
system_db = self.get_variable("_system_db")
system, configuration = self.get_system_configuration(
P, same_as=None
)
read(
filename,
configuration,
extension=extension,
add_hydrogens=P["add hydrogens"],
system_db=system_db,
system=system,
indices=P["indices"],
subsequent_as_configurations=as_configurations,
system_name=P["system name"],
configuration_name=P["configuration name"],
printer=printer.important,
references=self.references,
bibliography=self._bibliography,
)
tmp_path.unlink()
n += 1
if n % 1000 == 0:
print(n)
printer.important(
__(
f"\n Created {n} structures from the tarfile {tarfile}",
indent=4 * " ",
)
)