# -*- coding: utf-8 -*-
"""A flowchart, which is a set of nodes. There must be a single
'start' node, with other nodes connected via their ports to describe
the flowchart. There may be isolated nodes or groups of connected nodes;
however, the flow starts at the 'start' node and follows the connections,
so isolated nodes and fragments will not be executed."""
from datetime import datetime
import hashlib
import json
import logging
import os
import os.path
import stat
from packaging.version import Version
import seamm
from .seammrc import SEAMMrc
import seamm_util
logger = logging.getLogger(__name__)
[docs]
class Flowchart(object):
graphics = "Tk"
"""str: The default graphics to use for display, if
needed. Default: 'Tk'
"""
def __init__(
self,
parent=None,
data=None,
namespace="org.molssi.seamm",
name="",
description="",
directory=None,
output="files",
parser_name="SEAMM",
):
"""Initialize the flowchart.
Parameters
----------
parent : Object
The parent of the nodes in this flowchart.
data : dict
An initial graph.
namespace : str
The namespace for locating plug-ins.
name : str
A name for the flowchart
description : str
A description of the flowcharts.
directory : str
The root directory for files for this flowchart.
output : str
Where to direct output. Currently not used.
parser_name : str = "SEAMM"
The name of the parser.
"""
self.graph = seamm.Graph()
self.parent = parent
self.output = output # Where to print output, files, stdout, both
self.metadata = {}
self.reset_metadata(title=name, description=description)
self._executor = None
self._parser_name = parser_name
# Setup the plugin handling
self.plugin_manager = seamm.PluginManager(namespace)
# and make sure that the start node exists
self.add_node(seamm.StartNode(flowchart=self))
# And the root directory
self.root_directory = directory
# And the parser associated with this flowchart
self._parser = None
def __iter__(self):
return self.graph.__iter__()
@property
def executor(self):
"""The executor for tasks."""
return self._executor
@executor.setter
def executor(self, value):
self._executor = value
@property
def is_development(self):
"""Check if any of nodes are development versions."""
for node in self:
if Version(node.version).is_prerelease:
return True
return False
@property
def root_directory(self):
"""The root directory for files, etc for this flowchart"""
if self._root_directory is None:
self._root_directory = os.path.join(
os.getcwd(), datetime.now().isoformat(sep="_", timespec="seconds")
)
return self._root_directory
@root_directory.setter
def root_directory(self, value):
self._root_directory = value
@property
def output(self):
"""Where to print output:
files: to files in subdirectories
stdout: to standard output (for intereactive use)
both: to both files and standard output
"""
return self._output
@output.setter
def output(self, value):
if value in ("files", "stdout", "both"):
self._output = value
else:
raise RuntimeError(
"flowchart.output must be one of 'files', 'stdout', or 'both'"
", not '{}'".format(value)
)
@property
def parser(self):
"""The SEAMM parser associated with this flowchart."""
if self._parser is None:
self._parser = seamm_util.seamm_parser(self._parser_name)
return self._parser
@parser.setter
def parser(self, value):
self._parser = value
# -------------------------------------------------------------------------
# Node creation and deletion
# -------------------------------------------------------------------------
[docs]
def create_node(self, extension_name):
"""Create a new node given the extension name"""
plugin = self.plugin_manager.get(extension_name)
node = plugin.create_node(flowchart=self, extension=extension_name)
node.parent = self.parent
return node
[docs]
def add_node(self, n, **attr):
"""Add a single node n, ensuring that it knows the flowchart"""
n.flowchart = self
return self.graph.add_node(n, **attr)
[docs]
def remove_node(self, node):
"""Delete a node from the flowchart, and from the graphics if
necessary
"""
# Remove the drawing of the node
node.undraw()
# and any edges, including graphics if appropriate
node.remove_edge("all")
# and the node
self.graph.remove_node(node)
# -------------------------------------------------------------------------
# Finding nodes
# -------------------------------------------------------------------------
[docs]
def tag_exists(self, tag):
"""Check if the node with a given tag exists. A tag is a string like
'node=<uuid>', where <uuid> is the unique integer id for the node.
"""
for node in self:
if node.tag == tag:
return True
return False
[docs]
def get_node(self, uuid):
"""Return the node with a given uuid"""
if isinstance(uuid, int):
uuid = str(uuid)
for node in self:
if str(node.uuid) == uuid:
return node
return None
[docs]
def get_nodes(self):
"Return a list of all the nodes in the traversal."
logger.debug("Getting nodes")
# Reset the visited flag to check for loops
self.reset_visited()
# Get the start node
next_node = self.get_node("1")
# And traverse the nodes.
nodes = []
while next_node:
nodes.append(next_node)
next_node = next_node.next()
logger.debug("Finished getting nodes")
return nodes
[docs]
def last_node(self, node="1"):
"""Find the last node walking down the main execution path
from the given node, which defaults to the start node"""
if isinstance(node, str):
node = self.get_node(node)
for edge in self.graph.edges(node, direction="out"):
if edge.edge_type == "execution":
return self.last_node(edge.node2)
return node
[docs]
def clear(self, all=False):
"""Override the underlying clear() to ensure that the start node is present"""
self.graph.clear()
# and make sure that the start node exists
if not all:
start_node = seamm.StartNode(flowchart=self)
self.add_node(start_node)
[docs]
def list_nodes(self):
"""List the nodes, for debugging"""
result = []
for node in self:
result.append(node.__class__.__name__ + " {}".format(node))
return result
# -------------------------------------------------------------------------
# Traversal
# -------------------------------------------------------------------------
[docs]
def reset_visited(self):
"""Reset the 'visited' flag, which is used to detect
loops during traversals
"""
for tmp in self:
tmp.visited = False
[docs]
def set_ids(self, node_id=()):
"""Sequentially number all nodes, and subnodes"""
logger.debug("Setting ids")
# Clear all ids
for node in self:
node.reset_id()
# Reset the visited flag to check for loops
self.reset_visited()
# Get the start node
next_node = self.get_node("1")
# And traverse the nodes.
n = 0
while next_node:
next_node = next_node.set_id((*node_id, str(n)))
n += 1
logger.debug("Finished setting ids")
[docs]
def create_parsers(self):
"""Create the argument parsers for the nodes."""
logger.debug("Creating argument parsers.")
# Reset the visited flag to check for loops
self.reset_visited()
# Get the start node
next_node = self.get_node("1")
# And traverse the nodes.
while next_node:
next_node = next_node.create_parser()
logger.debug("Finished creating argument parsers.")
[docs]
def set_log_level(self, options):
"""Set the log level for each node based on the options"""
logger.debug("Setting the log-level")
for node in self:
step_type = node.step_type
logger.debug(f" checking for node type {step_type}")
if step_type in options and "log_level" in options[step_type]:
logger.debug(f" log_level = {options[step_type]['log_level']}")
try:
node.logger.setLevel(options[step_type]["log_level"])
logger.debug(" set!")
except Exception as e:
print(f"Exception {type(e)}: {e}")
logger.debug("Finished setting node log levels.")
# -------------------------------------------------------------------------
# Strings, reading and writing
# -------------------------------------------------------------------------
[docs]
def digest(self, strict=False):
"""Generate a unique hash key for this flowchart.
Parameters
----------
strict: bool
Whether to include version information. Default: False
Returns
-------
string
"""
hasher = hashlib.sha256()
# Hash the nodes and edges in the graph by traversing the graph
# Reset the visited flag to check for loops
self.reset_visited()
# Get the start node
next_node = self.get_node("1")
# And traverse the nodes.
while next_node:
if next_node.visited:
break
next_node.visited = True
hasher.update(bytes(next_node.digest(strict=strict), "utf-8"))
next_node = next_node.next()
return hasher.hexdigest()
[docs]
def to_json(self):
"""Ufff. Turn ourselves into JSON"""
return json.dumps(self.to_dict())
[docs]
def to_dict(self):
"""Serialize the graph and everything it contains in a dict"""
data = {
"item": "object",
"module": self.__module__,
"class": self.__class__.__name__,
"extension": None,
}
nodes = data["nodes"] = []
for node in self:
nodes.append(node.to_dict())
edges = data["edges"] = []
for edge in self.graph.edges():
attr = {}
for key in edge:
if key not in ("node1", "node2", "edge_type", "edge_subtype"):
attr[key] = edge[key]
edges.append(
{
"item": "edge",
"node1": edge.node1.uuid,
"node2": edge.node2.uuid,
"edge_type": edge.edge_type,
"edge_subtype": edge.edge_subtype,
"attributes": attr,
}
)
return data
[docs]
def from_dict(self, data):
"""recreate the flowchart from the dict. Inverse of to_dict."""
if "class" not in data:
raise RuntimeError("There is no class information in the data!")
if data["class"] != self.__class__.__name__:
raise RuntimeError(
"The dictionary does not contain a flowchart!"
" It contains a {} class".format(data["class"])
)
self.clear()
# Recreate the nodes
for node in data["nodes"]:
if node["class"] == "StartNode":
new_node = self.get_node("1")
new_node.from_dict(node)
continue
logger.debug("creating {} node".format(node["extension"]))
plugin = self.plugin_manager.get(node["extension"])
logger.debug(" plugin object: {}".format(plugin))
# Recreate the node
new_node = plugin.create_node(flowchart=self, extension=node["extension"])
new_node.parent = self.parent
# set uuid to correct value
new_node._uuid = node["attributes"]["_uuid"]
# and add to the flowchart
self.add_node(new_node)
new_node.from_dict(node)
logger.debug("adding nodes: nodes:\n\t" + "\n\t".join(self.list_nodes()))
# and the edges connecting them
for edge in data["edges"]:
node1 = self.get_node(edge["node1"])
node2 = self.get_node(edge["node2"])
self.add_edge(
node1,
node2,
edge_type=edge["edge_type"],
edge_subtype=edge["edge_subtype"],
**edge["attributes"],
)
logger.debug("Adding edges, nodes:\n\t" + "\n\t".join(self.list_nodes()))
[docs]
def write(self, filename):
"""Write the serialized form to disk"""
with open(filename, "w") as fd:
fd.write(self.to_text())
logger.info(f"Wrote flowchart to {filename}")
permissions = stat.S_IMODE(os.lstat(filename).st_mode)
os.chmod(filename, permissions | stat.S_IXUSR | stat.S_IXGRP)
[docs]
def to_text(self):
"""Return the text for the flowchart.
This is the representation written to disk, submitted
as jobs, etc. There are two header lines followed by json
representing the flowchart.
Returns
-------
str : the text representation.
"""
text = "#!/usr/bin/env run_flowchart\n"
text += "!MolSSI flowchart 2.0\n"
text += "#metadata\n"
self.metadata["sha256"] = self.digest()
self.metadata["sha256_strict"] = self.digest(strict=True)
text += json.dumps(self.metadata, indent=4)
text += "\n"
text += "#flowchart\n"
text += json.dumps(self.to_dict(), indent=4, cls=seamm_util.JSONEncoder)
text += "\n"
text += "#end\n"
return text
[docs]
def from_text(self, text):
"""Recreate the flowchart from text"""
lines = iter(text.splitlines())
line = next(lines)
# There may be exec magic as first line
if line[0:2] == "#!":
line = next(lines)
if line[0:7] != "!MolSSI":
raise RuntimeError("File is not a MolSSI file! -- " + line)
tmp = line.split()
if len(tmp) < 3:
raise RuntimeError("File is not a proper MolSSI file! -- " + line)
if tmp[1] != "flowchart":
raise RuntimeError("File is not a flowchart! -- " + line)
flowchart_version = tmp[2]
version = Version(flowchart_version)
logger.info(f"Reading flowchart version {flowchart_version}")
if version < Version("2.0"):
self.metadata = {}
rest = "\n".join([x for x in lines])
data = json.loads(rest, cls=seamm_util.JSONDecoder)
else:
sections = {}
in_section = False
for line in lines:
if line.strip() == "":
continue
if line[0] == "#":
section = line.strip()[1:]
if section == "end":
in_section = False
else:
tmp = sections[section] = []
in_section = True
continue
elif in_section:
tmp.append(line)
self.metadata = json.loads("\n".join(sections["metadata"]))
data = json.loads("\n".join(tmp), cls=seamm_util.JSONDecoder)
if "class" not in data or data["class"] != "Flowchart":
raise RuntimeError("Text does not contain a flowchart")
self.from_dict(data)
[docs]
def read(self, filename):
"""Recreate the flowchart from the serialized form on disk"""
with open(filename, "r") as fd:
text = fd.read()
self.from_text(text)
# -------------------------------------------------------------------------
# Edges between nodes
# -------------------------------------------------------------------------
[docs]
def edges(self, node=None, direction="both"):
return self.graph.edges(node, direction)
[docs]
def add_edge(self, u, v, edge_type=None, edge_subtype="next", **attr):
return self.graph.add_edge(u, v, edge_type, edge_subtype, **attr)
[docs]
def print_edges(self, event=None):
"""Print all the edges. Useful for debugging!"""
print("All edges in flowchart")
for edge in self.edges():
# print(' {}'.format(edge))
print(
" {} {} {} {} {} {}".format(
edge.node1.tag,
edge.anchor1,
edge.node2.tag,
edge.anchor2,
edge.edge_type,
edge.edge_subtype,
)
)
# -------------------------------------------------------------------------
# Printing
# -------------------------------------------------------------------------