Source code for loop_step.loop

# -*- coding: utf-8 -*-

"""Non-graphical part of the Loop step in a SEAMM flowchart"""

import fnmatch
import logging
from pathlib import Path
import re
import shlex
import shutil
import sys
import traceback

import psutil
import pprint

import loop_step
import seamm
import seamm_util
import seamm_util.printing as printing
from seamm_util.printing import FormattedText as __

logger = logging.getLogger(__name__)
job = printing.getPrinter()
printer = printing.getPrinter("loop")


[docs] class BreakLoop(Exception): """Indicates that SEAMM should break from the loop""" def __init__(self, message="break from the loop"): super().__init__(message)
[docs] def break_loop(): """Break from the loop and continue on.""" raise BreakLoop()
[docs] class ContinueLoop(Exception): """Indicates that SEAMM should continue from the loop""" def __init__(self, message="continue with next iteration of loop"): super().__init__(message)
[docs] def continue_loop(): """Continue to the next iteration of the loop""" raise ContinueLoop()
[docs] class SkipIteration(Exception): """Indicates that SEAMM should skip this iteration of the loop, removing any directories, etc.""" def __init__(self, message="skip iteration of loop"): super().__init__(message)
[docs] def skip_iteration(): """Entirely skip this iteration, removing any files, etc.""" raise SkipIteration()
[docs] class Loop(seamm.Node): def __init__(self, flowchart=None, extension=None): """Setup the non-graphical part of the Loop step in a SEAMM flowchart. Keyword arguments: """ logger.debug("Creating Loop {}".format(self)) self.table_handle = None self.table = None self._loop_count = None self._loop_value = None self._loop_length = None self._file_handler = None self._custom_directory_name = None super().__init__( flowchart=flowchart, title="Loop", extension=extension, logger=logger ) # This needs to be after initializing subclasses... self.parameters = loop_step.LoopParameters() @property def version(self): """The semantic version of this module.""" return loop_step.__version__ @property def iter_format(self): if self._loop_length is None: return "07" else: n = len(str(self._loop_length)) return f"0{n}" @property def git_revision(self): """The git version of this module.""" return loop_step.__git_revision__ @property def working_path(self): if self._custom_directory_name is not None: tmp = Path(self.directory) / self._custom_directory_name else: tmp = Path(self.directory) / f"iter_{self._loop_value:{self.iter_format}}" return tmp
[docs] def describe(self): """Write out information about what this node will do""" self.visited = True # The description job.job(__(self.description_text(), indent=self.indent)) return self.exit_node()
[docs] def description_text(self, P=None): """Return a short description of this step. Return a nicely formatted string describing what this step will do. Keyword arguments: P: a dictionary of parameter values, which may be variables or final values. If None, then the parameters values will be used as is. """ if not P: P = self.parameters.values_to_dict() text = "" if P["type"] == "For": subtext = "For {variable} from {start} to {end} by {step}\n" elif P["type"] == "Foreach": if self.is_expr(P["values"]): subtext = f"Foreach {P['variable']} in {P['values']}\n" else: if isinstance(P["values"], str): values = [str(v) for v in shlex.split(P["values"])] else: values = [str(v) for v in P["values"]] if len(values) > 5: last = values[-1] values = values[0:6] values.append("...") values.append(last) tmp = ", ".join(values) if len(tmp) < 50: subtext = f"Foreach {P['variable']} in {tmp}\n" else: tmp = "\n ".join(values) subtext = f"Foreach {P['variable']} in\n {tmp}\n" elif P["type"] == "For rows in table": subtext = "For rows in table {table}\n" elif P["type"] == "For systems in the database": subtext = "For system in the database\n" else: subtext = "Loop type defined by {type}\n" text += self.header + "\n" + __(subtext, **P, indent=4 * " ").__str__() # Print the body of the loop join_node = self.previous() next_node = self.loop_node() while next_node is not None and next_node != join_node: text += "\n\n" text += str(__(next_node.description_text(), indent=4 * " ", wrap=False)) next_node = next_node.next() return text
[docs] def run(self): """Run a Loop step.""" # If the loop is empty, just go on if self.loop_node() is None: return self.exit_node() # Set up the directory, etc. super().run() P = self.parameters.current_values_to_dict( context=seamm.flowchart_variables._data ) # Reset variables to initial state. self._custom_directory_name = None # Print out header to the main output printer.important(__(self.description_text(P), indent=self.indent)) # Set up some unchanging variables if P["type"] == "For": # Some local variables need each iteration # See if loop variables are all integers integers = True start = P["start"] if isinstance(start, str): start = float(start) if start.is_integer(): start = int(start) else: integers = False step = P["step"] if isinstance(step, str): step = float(step) if step.is_integer(): step = int(step) else: integers = False end = P["end"] if isinstance(end, str): end = float(end) if end.is_integer(): end = int(end) else: integers = False if integers: fmt = f"0{len(str(end))}d" if self._loop_value is None: self.logger.info( "For {} from {} to {} by {}".format( P["variable"], P["start"], P["end"], P["step"] ) ) self.logger.info("Initializing loop") self._loop_count = 0 self._loop_value = start self.set_variable(P["variable"], self._loop_value) # Loop to get length... range doesn't work for nonintegers count = 0 tmp = start while tmp <= end: count += 1 tmp += step self._loop_length = count printer.important( __( f"The loop will have {self._loop_length} iterations.\n\n", indent=self.indent + 4 * " ", ) ) if self.variable_exists("_loop_indices"): tmp = self.get_variable("_loop_indices") self.set_variable("_loop_indices", (*tmp, self._loop_value)) else: self.set_variable("_loop_indices", (self._loop_value,)) self.set_variable("_loop_index", self._loop_value) elif P["type"] == "Foreach": if self._loop_value is None: self._loop_value = 0 if isinstance(P["values"], str): self._loop_length = len(shlex.split(P["values"])) else: self._loop_length = len(P["values"]) printer.important( __( f"The loop will have {self._loop_length} iterations.\n\n", indent=self.indent + 4 * " ", ) ) if self.variable_exists("_loop_indices"): tmp = self.get_variable("_loop_indices") self.set_variable( "_loop_indices", ( *tmp, None, ), ) else: self.set_variable("_loop_indices", (None,)) elif P["type"] == "For rows in table": if self._loop_value is None: self.table_handle = self.get_variable(P["table"]) self.table = self.table_handle["table"] self.table_handle["loop index"] = True self.logger.info( "Initialize loop over {} rows in table {}".format( self.table.shape[0], P["table"] ) ) self._loop_value = 0 self._loop_length = self.table.shape[0] printer.important( __( f"The loop will have {self._loop_length} iterations.\n\n", indent=self.indent + 4 * " ", ) ) if self.variable_exists("_loop_indices"): tmp = self.get_variable("_loop_indices") self.set_variable( "_loop_indices", ( *tmp, None, ), ) else: self.set_variable("_loop_indices", (None,)) where = P["where"] if where == "Use all rows": pass elif where == "Select rows where column": column = P["query-column"] op = P["query-op"] value = P["query-value"] if self.table.shape[0] > 0: row = self.table.iloc[0] tmp = pprint.pformat(row) self.logger.debug(f"Row is\n{tmp}") if column not in row: for key in row.keys(): if column.lower() == key.lower(): column = key break if column not in row: raise ValueError( f"Looping over table with criterion on column '{column}': " "that column does not exist." ) else: raise NotImplementedError(f"Loop cannot handle '{where}'") elif P["type"] == "For systems in the database": # Get a list of all the matching systems and configurations system_db = self.get_variable("_system_db") systems = system_db.systems # Filter on system names choice = P["where system name"] if choice == "is anything": pass elif choice == "is": name = P["system name"] systems = [s for s in systems if s.name == name] elif choice == "matches": pattern = P["system name"] systems = [s for s in systems if fnmatch.fnmatch(s.name, pattern)] elif choice == "regexp": pattern = P["system name"] systems = [s for s in systems if re.search(pattern, s.name) is not None] else: raise RuntimeError( f"Matching system names by '{choice}' is not supported" ) # Finally, allow only systems that contain the requested configuration choice = P["default configuration"] if choice == "last" or choice == "-1": systems = [s for s in systems if s.n_configurations > 0] configurations = [s.configurations[-1] for s in systems] elif choice == "first" or choice == "1": systems = [s for s in systems if s.n_configurations > 0] configurations = [s.configurations[0] for s in systems] elif choice == "name is": name = P["configuration name"] systems = [s for s in systems if s.n_configurations > 0] configurations = [] for s in systems: for c in s.configurations: if c.name is name: configurations.append(c) elif choice == "matches": pattern = P["configuration name"] systems = [s for s in systems if s.n_configurations > 0] configurations = [] for s in systems: for c in s.configurations: if fnmatch.fnmatch(c.name, pattern): configurations.append(c) elif choice == "regexp": pattern = P["configuration name"] configurations = [] for s in systems: for c in s.configurations: if re.search(pattern, c.name) is not None: configurations.append(c) elif choice == "all": name = P["configuration name"] systems = [s for s in systems if s.n_configurations > 0] configurations = [] for s in systems: configurations.extend(s.configurations) if self._loop_value is None: self._loop_value = 0 self._loop_length = len(configurations) printer.important( __( f"The loop will have {self._loop_length} iterations.\n\n", indent=self.indent + 4 * " ", ) ) if self.variable_exists("_loop_indices"): tmp = self.get_variable("_loop_indices") self.set_variable( "_loop_indices", ( *tmp, None, ), ) else: self.set_variable("_loop_indices", (None,)) # Remove any redirection of printing. if self._file_handler is not None: job.removeHandler(self._file_handler) self._file_handler = None # Find the handler for job.out and set the level up job_handler = None out_handler = None for handler in job.handlers: if ( isinstance(handler, logging.FileHandler) and "job.out" in handler.baseFilename ): job_handler = handler job_level = job_handler.level job_handler.setLevel(printing.JOB) elif isinstance(handler, logging.StreamHandler): out_handler = handler out_level = out_handler.level out_handler.setLevel(printing.JOB) # Cycle through the iterations next_node = self while next_node is not None: if next_node is self: next_node = self.loop_node() if P["type"] == "For": self._loop_count += 1 if self._loop_count > 1: self._loop_value += step self.set_variable(P["variable"], self._loop_value) # For integer loops, we can use the value for the directory names if integers: self._custom_directory_name = f"iter_{self._loop_value:{fmt}}" # Set up the index variables tmp = self.get_variable("_loop_indices") self.set_variable( "_loop_indices", ( *tmp[0:-1], self._loop_value, ), ) self.set_variable("_loop_index", self._loop_value) # See if we are at the end of loop if self._loop_count > self._loop_length: self._loop_value = None self._custom_directory_name = None # Revert the loop index variables to the next outer loop # if there is one, or remove them. tmp = self.get_variable("_loop_indices") if len(tmp) <= 1: self.delete_variable("_loop_indices") self.delete_variable("_loop_index") else: self.set_variable("_loop_indices", tmp[0:-1]) self.set_variable("_loop_index", tmp[-2]) self.logger.info( f"The loop over {P['variable']} from {start} to " f"{end} by {step} finished successfully" ) break self.logger.info(" Loop value = {}".format(self._loop_value)) elif P["type"] == "Foreach": self.logger.debug(f"Foreach {P['variable']} in {P['values']}") self._loop_value += 1 if self._loop_value > self._loop_length: self._loop_value = None self._loop_length = None # Revert the loop index variables to the next outer loop # if there is one, or remove them. tmp = self.get_variable("_loop_indices") if len(tmp) <= 1: self.delete_variable("_loop_indices") self.delete_variable("_loop_index") else: self.set_variable("_loop_indices", tmp[0:-1]) self.set_variable("_loop_index", tmp[-2]) self.logger.info("The loop over value finished successfully") # return the next node after the loop break if isinstance(P["values"], str): value = shlex.split(P["values"])[self._loop_value - 1] else: value = P["values"][self._loop_value - 1] self.set_variable(P["variable"], value) # Set up the index variables tmp = self.get_variable("_loop_indices") self.set_variable( "_loop_indices", ( *tmp[0:-1], self._loop_value, ), ) self.set_variable("_loop_index", self._loop_value) self.logger.info(" Loop value = {}".format(value)) elif P["type"] == "For rows in table": # Loop until query is satisfied while True: self._loop_value += 1 if self._loop_value > self.table.shape[0]: break if where == "Use all rows": break row = self.table.iloc[self._loop_value - 1] self.logger.debug(f"Query {row[column]} {op} {value}") _type = type(row[column]) _value = _type(value) if op == "==": if row[column] == _value: break elif op == "!=": if row[column] != _value: break elif op == ">": if row[column] > _value: break elif op == ">=": if row[column] >= _value: break elif op == "<": if row[column] < _value: break elif op == "<=": if row[column] <= _value: break elif op == "contains": if _value in row[column]: break elif op == "does not contain": if _value not in row[column]: break elif op == "contains regexp": if re.search(value, row[column]) is not None: break elif op == "does not contain regexp": if re.search(value, row[column]) is None: break elif op == "is empty": # Might be numpy.nan, and NaN != NaN hence odd test. if row[column] == "" or row[column] != row[column]: break elif op == "is not empty": if row[column] != "" and row[column] == row[column]: break else: raise NotImplementedError( f"Loop query '{op}' not implemented" ) if self._loop_value > self.table.shape[0]: self._loop_value = None self.delete_variable("_row") # Revert the loop index variables to the next outer loop # if there is one, or remove them. tmp = self.get_variable("_loop_indices") if len(tmp) <= 1: self.delete_variable("_loop_indices") self.delete_variable("_loop_index") else: self.set_variable("_loop_indices", tmp[0:-1]) self.set_variable("_loop_index", tmp[-2]) # and the other info in the table handle self.table_handle["loop index"] = False self.table = None self.table_handle = None self.logger.info( "The loop over table " + self.parameters["table"].value + " finished successfully" ) # return the next node after the loop break # Set up the index variables self.logger.debug(" _loop_value = {}".format(self._loop_value)) tmp = self.get_variable("_loop_indices") self.logger.debug(" _loop_indices = {}".format(tmp)) self.set_variable( "_loop_indices", (*tmp[0:-1], self.table.index[self._loop_value - 1]), ) self.logger.debug( " --> {}".format(self.get_variable("_loop_indices")) ) self.set_variable( "_loop_index", self.table.index[self._loop_value - 1] ) self.table_handle["current index"] = self.table.index[ self._loop_value - 1 ] row = self.table.iloc[self._loop_value - 1] self.set_variable("_row", row) self.logger.debug(" _row = {}".format(row)) elif P["type"] == "For systems in the database": self._loop_value += 1 if self._loop_value > self._loop_length: self._loop_value = None self._loop_length = None self._custom_directory_name = None # Revert the loop index variables to the next outer loop # if there is one, or remove them. tmp = self.get_variable("_loop_indices") if len(tmp) <= 1: self.delete_variable("_loop_indices") self.delete_variable("_loop_index") else: self.set_variable("_loop_indices", tmp[0:-1]) self.set_variable("_loop_index", tmp[-2]) self.logger.info("The loop over value finished successfully") # return the next node after the loop break # Set the default system and configuration configuration = configurations[self._loop_value - 1] system_db = configuration.system_db system = configuration.system system_db.system = configuration.system system.configuration = configuration if P["directory name"] == "system name": self._custom_directory_name = self.safe_filename(system.name) elif P["directory name"] == "configuration name": self._custom_directory_name = self.safe_filename( configuration.name ) else: self._custom_directory_name = None # Set up the index variables tmp = self.get_variable("_loop_indices") self.set_variable( "_loop_indices", ( *tmp[0:-1], self._loop_value, ), ) self.set_variable("_loop_index", self._loop_value) self.logger.info(f" system = {system.name}") self.logger.info(f"configuration = {configuration.name}") # Direct most output to iteration.out # A handler for the file iter_dir = self.working_path iter_dir.mkdir(parents=True, exist_ok=True) if self._file_handler is not None: self._file_handler.close() job.removeHandler(self._file_handler) path = iter_dir / "iteration.out" path.unlink(missing_ok=True) self._file_handler = logging.FileHandler(path) self._file_handler.setLevel(printing.NORMAL) formatter = logging.Formatter(fmt="{message:s}", style="{") self._file_handler.setFormatter(formatter) job.addHandler(self._file_handler) # Add the iteration to the ids so the directory structure is # reasonable self.flowchart.reset_visited() tmp = self.working_path.name self.set_subids((*self._id, tmp)) # Run through the steps in the loop body try: next_node = next_node.run() except DeprecationWarning as e: printer.normal("\nDeprecation warning: " + str(e)) traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stdout) except BreakLoop: break except ContinueLoop: next_node = self except SkipIteration: next_node = self shutil.rmtree(iter_dir) except Exception as e: tmp = self.working_path.name printer.job(f"Caught exception in loop iteration {tmp}: {str(e)}") with open(iter_dir / "stderr.out", "a") as fd: traceback.print_exc(file=fd) if "continue" in P["errors"]: next_node = self elif "exit" in P["errors"]: break else: raise if self.logger.isEnabledFor(logging.DEBUG): p = psutil.Process() self.logger.debug(pprint.pformat(p.open_files())) self.logger.debug(f"Bottom of loop {next_node}") # Return to the normally scheduled step, i.e. fall out of the loop. # Remove any redirection of printing. if self._file_handler is not None: self._file_handler.close() job.removeHandler(self._file_handler) self._file_handler = None if job_handler is not None: job_handler.setLevel(job_level) if out_handler is not None: out_handler.setLevel(out_level) return self.exit_node()
[docs] def default_edge_subtype(self): """Return the default subtype of the edge. Usually this is 'next' but for nodes with two or more edges leaving them, such as a loop, this method will return an appropriate default for the current edge. For example, by default the first edge emanating from a loop-node is the 'loop' edge; the second, the 'exit' edge. A return value of 'too many' indicates that the node exceeds the number of allowed exit edges. """ # how many outgoing edges are there? n_edges = len(self.flowchart.edges(self, direction="out")) self.logger.debug(f"loop.default_edge_subtype, n_edges = {n_edges}") if n_edges == 0: return "loop" elif n_edges == 1: return "exit" else: return "too many"
[docs] def create_parser(self): """Setup the command-line / config file parser""" parser_name = "loop-step" parser = seamm_util.getParser() # Remember if the parser exists ... this type of step may have been # found before parser_exists = parser.exists(parser_name) # Create the standard options, e.g. log-level super().create_parser(name=parser_name) if not parser_exists: # Any options for loop itself pass # Now need to walk through the steps in the loop... for edge in self.flowchart.edges(self, direction="out"): if edge.edge_subtype == "loop": self.logger.debug("Loop, first node of loop is: {}".format(edge.node2)) next_node = edge.node2 while next_node and next_node != self: next_node = next_node.create_parser() return self.exit_node()
[docs] def set_id(self, node_id=()): """Sequentially number the loop subnodes""" self.logger.debug("Setting ids for loop {}".format(self)) if self.visited: return None else: self.visited = True self._id = node_id self.set_subids(self._id) return self.exit_node()
[docs] def set_subids(self, node_id=()): """Set the ids of the nodes in the loop""" next_node = self.loop_node() n = 0 while next_node and next_node != self: next_node = next_node.set_id((*node_id, str(n))) n += 1
[docs] def exit_node(self): """The next node after the loop, if any""" for edge in self.flowchart.edges(self, direction="out"): if edge.edge_subtype == "exit": self.logger.debug(f"Loop, node after loop is: {edge.node2}") return edge.node2 # loop is the last node in the flowchart self.logger.debug("There is no node after the loop") return None
[docs] def loop_node(self): """The first node in the loop body""" for edge in self.flowchart.edges(self, direction="out"): if edge.edge_subtype == "loop": self.logger.debug(f"Loop, first node in loop is: {edge.node2}") return edge.node2 # There is no body of the loop! self.logger.debug("There is no loop body") return None
[docs] def safe_filename(self, filename): clean = re.sub(r"[/\\?%*:|\"<>\x7F\x00-\x1F]", "-", filename) # Check for duplicates... path = Path(self.directory) / clean count = 1 while path.exists(): count += 1 path = Path(self.directory) / f"{clean}_{count}" return path.name