# -*- coding: utf-8 -*-
"""The flowchart is a visual representation of a flowchart
drawn on a Tk canvas. The nodes of the graph are shown as
ovals, rectangles, etc. with the edges indicated by arrows
from one node to another.
The outline of a node has the following Tk tags: ::
node=xxxxx type=outline
The title: ::
node=xxxxx type=title
When the mouse is over the node, the anchor points are activated.
They have the following tags: ::
node=xxxxx type=anchor anchor=<point>
When the mouse is over one of the active anchor points, it is
covered with a larger circle, with tags: ::
node=xxxxx type=active_anchor anchor=<point>
Edges are indicated by directional arrows between nodes. The
arrows have the following tags: ::
edge=xxxxx type=arrow
When the mouse if over an arrow, it is shown to be active by placing
two red squares on the base and head of the arrow: ::
type=arrow_base arrow=<item> edge=xxxxx
type=arrow_head arrow=<item> edge=xxxxx
Clicking on either of these allows dragging the head or tail to
another anchor point on the same or another node (but not on the
same node as the tail/head for head/tail!). If the arrow is dropped
anywhere else it just snaps back to its original place.
"""
import copy
import logging
import math
import pkg_resources
import pprint # nopep8
import sys
import tkinter as tk
import tkinter.filedialog as tk_filedialog
import tkinter.ttk as ttk
from PIL import ImageTk, Image
import seamm
from .tk_open import TkOpen
from .tk_publish import TkPublish
logger = logging.getLogger(__name__)
[docs]
def grey(value):
return 255 - (255 - value) * 0.1
[docs]
class TkFlowchart(object):
def __init__(self, master=None, flowchart=None, namespace="org.molssi.seamm.tk"):
"""Initialize a Flowchart object
Keyword arguments:
"""
self.toplevel = None
self.master = master
self._flowchart = flowchart
self.filename = None
self._stack = []
self.graph = seamm.Graph()
# Setup the plugin handling
self.plugin_manager = seamm.PluginManager(namespace)
# Job handler
self._job_handler = seamm.TkJobHandler()
self.canvas_width = 500
self.canvas_height = 500
self.grid_x = 300 # Width of the columns for the step display
self.grid_y = 70 # Height of the rows for the step display
self.gap = 50
self.halo = 8 # How many pixels to consider 'near'
self.data = None
self._x0 = None
self._y0 = None
self.selection = None
self.active_nodes = []
self.in_callback = False
self.canvas_after_callback = None
self.popup_menu = None
# Create the panedwindow
self.pw = tk.PanedWindow(self.master, orient=tk.HORIZONTAL)
self.pw.pack(fill=tk.BOTH, expand=1)
# On the left put the tree of nodes
self.tree = ttk.Treeview(self.pw)
self.pw.add(self.tree)
for group in self.plugin_manager.groups():
self.tree.insert("", "end", group, text=group)
for plugin in self.plugin_manager.plugins(group):
self.tree.insert(group, "end", plugin, text=plugin, tags="node")
self.tree.tag_bind(
"node", sequence="<ButtonPress-1>", callback=self.create_node
)
# and the main canvas with scrollbars next to the right
self.canvas_frame = ttk.Frame(self.pw)
self.canvas_frame.grid_rowconfigure(0, weight=1)
self.canvas_frame.grid_columnconfigure(0, weight=1)
self.xscrollbar = tk.Scrollbar(self.canvas_frame, orient=tk.HORIZONTAL)
self.xscrollbar.grid(row=1, column=0, sticky=tk.EW)
self.yscrollbar = tk.Scrollbar(self.canvas_frame)
self.yscrollbar.grid(row=0, column=1, sticky=tk.NS)
self.canvas = tk.Canvas(
self.canvas_frame,
width=self.canvas_width,
height=self.canvas_height,
xscrollcommand=self.xscrollbar.set,
yscrollcommand=self.yscrollbar.set,
scrollregion=(0, 0, 2000, 2000),
)
self.canvas.grid(row=0, column=0, sticky=tk.NSEW)
self.xscrollbar.config(command=self.xview)
self.yscrollbar.config(command=self.yview)
self.pw.add(self.canvas_frame)
# Set up scrolling on the canvas with the mouse scrollwheel or similar
self.canvas.bind("<Enter>", self._bound_to_mousewheel)
self.canvas.bind("<Leave>", self._unbound_to_mousewheel)
# background image
filepath = pkg_resources.resource_filename(__name__, "data/SEAMM.png")
logger.info(filepath)
self.image = Image.open(filepath)
self.prepared_image = Image.eval(self.image.convert("RGB"), grey)
w, h = self.image.size
r_w = self.canvas_width / w
r_h = self.canvas_height / h
factor = r_w if r_w < r_h else r_h
w = int(factor * w)
h = int(factor * h)
self.working_image = self.prepared_image.resize((w, h))
self.photo = ImageTk.PhotoImage(self.working_image)
# self.background = self.canvas.create_image(
# self.canvas_width / 2,
# self.canvas_height / 2,
# image=self.photo,
# anchor='center')
self.background = self.canvas.create_image(
0, 0, image=self.photo, anchor="center"
)
# The gui partner for the start node...
self.create_start_node()
# Set up the bindings
self.canvas.bind("<Configure>", self.canvas_configure)
self.canvas.bind("<Motion>", self.mouse_motion)
self.canvas.bind("<ButtonPress-1>", self.click)
self.canvas.bind("<Double-ButtonPress-1>", self.double_click)
if sys.platform.startswith("darwin"):
self.canvas.bind("<ButtonPress-2>", self.right_click)
else:
self.canvas.bind("<ButtonPress-3>", self.right_click)
logger.debug("Finished initializing tk_flowchart")
def __iter__(self):
return self.graph.__iter__()
@property
def flowchart(self):
"""The flowchart, which holds the nodes"""
return self._flowchart
@flowchart.setter
def flowchart(self, value):
self._flowchart = value
@property
def master(self):
"""The window that is our master"""
return self._master
@master.setter
def master(self, value):
if value:
self.toplevel = value.winfo_toplevel()
self._master = value
[docs]
def tag_exists(self, tag):
"""Check if the node with a given tag exists"""
for node in self:
if node.tag == tag:
return True
return False
[docs]
def get_node(self, tag):
"""Return the node with a given tag"""
if isinstance(tag, int):
tag = str(tag)
for node in self:
if str(node.uuid) == tag:
return node
return None
[docs]
def last_node(self, tk_node="1"):
"""Find the last node walking down the main execution path
from the given node, which defaults to the start node"""
logger.debug("Finding last node")
# Handle loops!
for node in self:
node.node.visited = False
logger.debug(
" reset visited {} {} = {}".format(
node.node.visited, node.title, node
)
)
return self.last_node_helper(tk_node)
[docs]
def last_node_helper(self, tk_node):
"""Helper routine to handle the recursion"""
# get the node to start the traversal
if isinstance(tk_node, str):
tk_node = self.get_node(tk_node)
logger.debug(
" last tk_node helper: {} {} = {}".format(
tk_node.node.visited, tk_node.title, tk_node
)
)
tk_node.node.visited = True
next_tk_node = None
for edge in self.graph.edges(tk_node, direction="out"):
if edge.edge_type == "execution":
if edge.node2.node.visited:
logger.debug(
"\ttk_node {} {} has been visited".format(
edge.node2.title, edge.node2
)
)
next_tk_node = edge.node2
else:
logger.debug(
"\trecursing to tk_node {} {}".format(
edge.node2.title, edge.node2
)
)
return self.last_node_helper(edge.node2)
if next_tk_node is not None:
tk_node = next_tk_node
logger.debug(
"\tchecking visited tk_node {} {} for new nodes".format(
tk_node.title, tk_node
)
)
if tk_node.node.extension == "Join":
logger.debug("\t tk_node is a join node, so look at next")
for edge in self.graph.edges(tk_node, direction="out"):
if edge.edge_type == "execution":
logger.debug(
"\ttk_node after join node is {} {}".format(
edge.node2.title, edge.node2
)
)
tk_node = edge.node2
for edge in self.graph.edges(tk_node, direction="out"):
if edge.edge_type == "execution":
if edge.node2.node.visited:
logger.debug(
"\tnode {} {} has been visited".format(
edge.node2.title, edge.node2
)
)
else:
logger.debug(
"\trecursing to tk_node {} {}".format(
edge.node2.title, edge.node2
)
)
return self.last_node_helper(edge.node2)
logger.debug("\treturning {} {}".format(tk_node.title, tk_node))
return tk_node
[docs]
def add_edge(self, u, v, edge_type="execution", edge_subtype="next", **kwargs):
edge = self.graph.add_edge(
u,
v,
edge_type,
edge_subtype,
edge_class=seamm.TkEdge,
canvas=self.canvas,
**kwargs,
)
edge.draw()
return edge
[docs]
def edges(self, node=None, direction="both"):
return self.graph.edges(node, direction)
[docs]
def new_file(self, event=None):
self.filename = None
self.clear()
# Reset the metadata
self.flowchart.reset_metadata()
[docs]
def help(self, event=None):
print("Help!!!!")
[docs]
def debug(self, event):
print(event)
[docs]
def open_file(self, event=None):
filename = tk_filedialog.askopenfilename(defaultextension=".flow")
if filename == "":
return
self.filename = filename
self.open(filename)
[docs]
def open(self, filename):
if isinstance(filename, list):
filename = filename[0]
self.flowchart.read(filename)
self.from_flowchart()
self.filename = filename
[docs]
def flowchart_search(self, event=None):
"""Open a flowchart from Zenodo."""
opener = TkOpen(self.toplevel)
data = opener.open()
if data is not None:
self.flowchart.from_text(data["data"])
if "path" in data:
self.filename = data["path"]
else:
self.filename = None
self.from_flowchart()
[docs]
def clear(self, all=False):
"""Clear our graphics"""
for item in self.canvas.find_all():
if item != self.background:
self.canvas.delete(item)
# and the graph
self.graph.clear()
# recreate the start node
if not all:
self.create_start_node()
self.draw()
[docs]
def create_start_node(self):
"""Create the start node"""
# Check if the start node exists
start_node = self.flowchart.get_node("1")
if start_node is None:
start_node = seamm.StartNode()
tk_start_node = seamm.TkStartNode(
tk_flowchart=self,
canvas=self.canvas,
node=start_node,
x=self.grid_x / 2,
y=self.grid_y / 2,
)
self.graph.add_node(tk_start_node)
logger.debug(
"Created start node {} at {}, {}".format(
tk_start_node, start_node.x, start_node.y
)
)
return tk_start_node
[docs]
def properties(self):
"""Get and set the properties of the flowchart."""
start_node = self.get_node("1")
start_node.edit()
[docs]
def publish(self, event=None):
"""Publish the flowchart to a repository such as Zenodo."""
self.update_flowchart()
publisher = TkPublish(self)
publisher.edit()
[docs]
def save(self, event=None):
if self.filename is None:
self.save_file()
else:
self.update_flowchart()
self.flowchart.write(self.filename)
self.flowchart.clear()
[docs]
def save_file(self, event=None):
filename = tk_filedialog.asksaveasfilename(defaultextension=".flow")
if filename != "":
# suffixes = pathlib.Path(filename).suffixes
# if len(suffixes) == 0 or '.flow' not in suffixes:
# filename = filename + '.flow'
self.filename = filename
self.save()
[docs]
def about(self, text="In about"):
print(text)
[docs]
def preferences(self):
print("In preferences")
[docs]
def draw(self):
for tk_node in self:
tk_node.draw()
[docs]
def click(self, event):
"""Handle a left-click on the canvas by finding out what the
mouse is on/in/near and doing the appropriate thing, such as
selecting to preparing to move the item.
"""
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
items = self.canvas.find_closest(cx, cy, self.halo)
self.selection = []
for item in items:
tags = self.get_tags(item)
if "type" in tags and (
tags["type"] == "arrow_base" or tags["type"] == "arrow_head"
):
arrow = int(tags["arrow"])
xys = self.canvas.coords(item)
x0 = xys[0]
y0 = xys[1]
x1 = xys[-2]
y1 = xys[-1]
self.data = self.get_tags(arrow)
self.data["arrow"] = arrow
self.data.update(self.get_tags(arrow))
self.data["x0"] = x0
self.data["y0"] = y0
self.data["x1"] = x1
self.data["y1"] = y1
self._x0 = cx
self._y0 = cy
logger.debug("self.data for dragging arrow head or base")
logger.debug("{}".format(self.data))
if tags["type"] == "arrow_base":
self.data["arrow_base"] = item
self.data["arrow_head"] = self.canvas.find_withtag(
"type=arrow_head"
)
self.mouse_op = "drag arrow base"
self.canvas.bind("<B1-Motion>", self.drag_arrow_base)
self.canvas.bind("<ButtonRelease-1>", self.drop_arrow_base)
else:
self.data["arrow_base"] = self.canvas.find_withtag(
"type=arrow_base"
)
self.data["arrow_head"] = item
self.mouse_op = "drag arrow head"
self.canvas.bind("<B1-Motion>", self.drag_arrow_head)
self.canvas.bind("<ButtonRelease-1>", self.drop_arrow_head)
if "node" in tags:
node = tags["node"]
if tags["type"] == "active_anchor":
# Connecting from an anchor to another node
x, y = node.anchor_point(tags["anchor"])
self.mouse_op = "Connect"
arrow = self.canvas.create_line(
x, y, cx, cy, arrow=tk.LAST, tags="type=active_arrow"
)
self.data = (node, tags["anchor"], x, y, arrow)
self.canvas.bind("<B1-Motion>", self.drag_arrow)
self.canvas.bind("<ButtonRelease-1>", self.drop_arrow)
else:
if node.is_inside(cx, cy, self.halo):
self.selection.append(node)
node.selected = True
self._x0 = cx
self._y0 = cy
self.mouse_op = "Move"
self.canvas.bind("<B1-Motion>", self.move)
self.canvas.bind("<ButtonRelease-1>", self.end_move)
else:
node.selected = False
[docs]
def double_click(self, event):
"""Handle a double-click on the canvas by finding out what the
mouse is on/in/near and doing the appropriate thing.
"""
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
result = self.find_items(cx, cy)
self.selection = []
if result is None:
# Handle a right-click out side anything
print("Double-click outside objects")
return
if result[0] == "node":
node = result[1]
if node.is_inside(cx, cy):
node.double_click(event)
return
if result[0] == "item":
item = result[1]
tags = self.get_tags(item)
if "type" in tags and tags["type"] == "arrow":
self.right_click_on_arrow(event, item, tags)
[docs]
def right_click(self, event):
"""Handle a right-click on the canvas by finding out what the
mouse is on/in/near and doing the appropriate thing, such as
posting an action menu
"""
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
result = self.find_items(cx, cy)
self.selection = []
if result is None:
# Handle a right-click out side anything
print("Right-click outside objects")
return
if result[0] == "node":
node = result[1]
if node.is_inside(cx, cy):
node.right_click(event)
return
if result[0] == "item":
item = result[1]
tags = self.get_tags(item)
if "type" in tags and tags["type"] == "arrow":
self.right_click_on_arrow(event, item, tags)
[docs]
def move(self, event):
"""Move selected items"""
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
deltax = cx - self._x0
deltay = cy - self._y0
self._x0 = cx
self._y0 = cy
for item in self.selection:
item.move(deltax, deltay)
[docs]
def end_move(self, event):
"""End the move of selected items"""
self.canvas.bind("<B1-Motion>", "")
self.canvas.bind("<ButtonRelease-1>", "")
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
deltax = cx - self._x0
deltay = cy - self._y0
for item in self.selection:
item.end_move(deltax, deltay)
item.selected = False
self._x0 = None
self._y0 = None
self.mouse_op = None
self.selection = None
[docs]
def create_node(self, event):
"""Create a node using the type in menu. This is a bit tricky because
we need to create both the node and its graphical partner, each of
which needs to know the other.
"""
# not scrolling the left pane yet
item = self.tree.identify_row(event.y)
plugin_name = self.tree.item(item, option="text")
(last_node, x, y, anchor1, anchor2) = self.next_position()
edge_subtype = last_node.default_edge_subtype()
logger.debug("creating {} node".format(plugin_name))
# The node.
node = self.flowchart.create_node(plugin_name)
# The graphics partner
plugin = self.plugin_manager.get(plugin_name)
logger.debug(" plugin object: {}".format(plugin))
tk_node = plugin.create_tk_node(
tk_flowchart=self, canvas=self.canvas, x=0, y=0, node=node
)
self.graph.add_node(tk_node)
# And figure out where the node should be
# Use the grid approach
x0 = last_node.x
y0 = last_node.y
if anchor1 == "s":
tk_node.x = x0
tk_node.y = y0 + self.grid_y
elif anchor1 == "e":
tk_node.x = x0 + self.grid_x
tk_node.y = y0
else:
dx, dy = tk_node.anchor_point(anchor2)
# flip sign to get direction right
tk_node.x = x - dx
tk_node.y = y - dy
# And connect this to the last node in the existing flowchart,
# which is probably what the user wants.
self.add_edge(
last_node,
tk_node,
"execution",
anchor1=anchor1,
anchor2=anchor2,
edge_subtype=edge_subtype,
)
# And update the picture on screen
self.draw()
[docs]
def remove_node(self, node):
"""Remove the given node"""
# remove the graphical rendering
node.undraw()
# and edges
node.remove_edge("all")
# and the node itself
self.graph.remove_node(node)
[docs]
def next_position(self):
"""Find a reasonable place to position the next step
in the flowchart."""
last_node = self.last_node()
# center of node
x0 = last_node.x
y0 = last_node.y
# Get the anchor point the last node wants to use
anchor1 = last_node.next_anchor()
# and the inverse for the new node
anchor2 = anchor1.translate("".maketrans("news", "swen"))
# Find the point 'gap' past the anchor point of the last
# node, looking from the center (0, 0)
x1, y1 = last_node.anchor_point(anchor1)
dx = x1 - x0
dy = y1 - y0
norm = math.sqrt(dx * dx + dy * dy)
x = x1 + self.gap * (dx / norm)
y = y1 + self.gap * (dy / norm)
return (last_node, x, y, anchor1, anchor2)
[docs]
def mouse_motion(self, event, exclude=()):
"""Track the mouse and highlight the node under the mouse
It appears that the canvas find_closest does not work properly in
Python. Even if you give it a tag to look below, it always returns
the topmost item, so we cannot loop through items.
Instead we use find_overlapping, which does return a list. However,
if the mouse is e.g. inside a rectangle but far enough from the edges
find_overlapping does not find it. In this case we use the current
tag to find the object.
"""
if self.in_callback:
print("IN CALLBACK!!!!!!")
return
self.in_callback = True
result = None
self.canvas.delete("type=active_anchor")
self.canvas.delete("type=arrow_base")
self.canvas.delete("type=arrow_head")
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
active = []
items = self.canvas.find_overlapping(
cx + self.halo / 2,
cy + self.halo / 2,
cx - self.halo / 2,
cy - self.halo / 2,
)
if len(items) == 0:
# If we are within e.g. a rectangle, it may not overlap
# but will be the current item, so if nothing overlaps
# use the current item (if there is one).
items = self.canvas.find_withtag("current")
# Loop backwards since the 'top' item is at the end of the list
# and is probably the item we want.
for item in items[::-1]:
if item in exclude:
continue
tags = self.get_tags(item)
# on an arrow?
if "type" in tags and tags["type"] == "arrow":
xys = self.canvas.coords(item)
x0 = xys[0]
y0 = xys[1]
x1 = xys[-2]
y1 = xys[-1]
self.canvas.create_rectangle(
x0 - self.halo / 2,
y0 - self.halo / 2,
x0 + self.halo / 2,
y0 + self.halo / 2,
tags=["type=arrow_base", "arrow=" + str(item), tags["edge"].tag()],
outline="red",
fill="red",
)
self.canvas.create_rectangle(
x1 - self.halo / 2,
y1 - self.halo / 2,
x1 + self.halo / 2,
y1 + self.halo / 2,
tags=["type=arrow_head", "arrow=" + str(item), tags["edge"].tag()],
outline="red",
fill="red",
)
break
if "node" in tags:
node = tags["node"]
if node.is_inside(cx, cy, self.halo):
active.append(node)
if node not in self.active_nodes:
node.activate()
self.active_nodes.append(node)
# are we close to any anchor points?
point = node.check_anchor_points(cx, cy, self.halo)
if point is None:
self.canvas.delete("type=active_anchor")
else:
node.activate_anchor_point(point, self.halo)
result = (node, point)
break
# deactivate any previously active nodes
for node in self.active_nodes:
if node not in active:
node.deactivate()
self.active_nodes = active
self.in_callback = False
return result
[docs]
def find_items(self, x, y, exclude=()):
"""Return the 'top' node under the mouse coordinates x, y
It appears that the canvas find_closest does not work properly in
Python. Even if you give it a tag to look below, it always returns
the topmost item, so we cannot loop through items.
Instead we use find_overlapping, which does return a list. However,
if the mouse is e.g. inside a rectangle bat far enough from the edges
find_overlapping does not find it. In this case we use the current
tag to find the object.
"""
items = self.canvas.find_overlapping(
x + self.halo / 2, y + self.halo / 2, x - self.halo / 2, y - self.halo / 2
)
if len(items) == 0:
# If we are within e.g. a rectangle, it may not overlap
# but will be the current item, so if nothing overlaps
# use the current item (if there is one).
items = self.canvas.find_withtag("current")
# Loop backwards since the 'top' item is at the end of the list
# and is probably the item we want.
for item in items[::-1]:
if item in exclude:
continue
tags = self.get_tags(item)
if "node" in tags:
node = tags["node"]
if node.is_inside(x, y, self.halo):
# are we close to any anchor points?
point = node.check_anchor_points(x, y, self.halo)
return ("node", node, point)
return ("item", item)
return None
[docs]
def drag_arrow(self, event):
"""Drag an arrow from the anchor on the node to the mouse
Used when creating a new edge.
"""
logger.debug("drag arrow")
node, anchor, x, y, arrow = self.data
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
logger.debug(
" x = {}, y = {}, cx = {}, cy = {}, arrow = {}".format(x, y, cx, cy, arrow)
)
self.canvas.coords(arrow, x, y, cx, cy)
# Check for being near another nodes anchor point
result = self.find_items(cx, cy, exclude=(arrow,))
logger.debug(" result = {}".format(result))
if result is not None and result[0] == "node":
logger.debug(" node = {}".format(node))
logger.debug(" result[1] = {}".format(result[1]))
if node == result[1]:
self.canvas.delete("type=active_anchor")
logger.debug(" deactivate {}".format(result[1]))
result[1].deactivate()
else:
logger.debug(" activate_node {} {}".format(result[1], result[2]))
self.activate_node(result[1], result[2])
[docs]
def drop_arrow(self, event):
"""The user has dropped a new arrow somewhere!
If it is on another node, make the connection.
If it is in empty space or on the original node,
just cancel the operation.
"""
self.canvas.bind("<B1-Motion>", "")
self.canvas.bind("<ButtonRelease-1>", "")
node, anchor, x, y, arrow = self.data
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
self.canvas.coords(arrow, x, y, cx, cy)
# Check for being near another nodes anchor point
result = self.find_items(cx, cy)
self.canvas.delete(arrow)
if result is not None and result[0] == "node":
other_node, point = result[1:]
if node != other_node and point is not None:
edge_subtype = node.default_edge_subtype()
self.add_edge(
node,
other_node,
"execution",
anchor1=anchor,
anchor2=point,
edge_subtype=edge_subtype,
)
self.data = None
self.mouse_op = None
[docs]
def drag_arrow_base(self, event):
"""Drag the base of an exisiting arrow"""
# move arrow
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
self.canvas.coords(self.data["arrow"], cx, cy, self.data["x1"], self.data["y1"])
# move base icon
deltax = cx - self._x0
deltay = cy - self._y0
self._x0 = cx
self._y0 = cy
self.canvas.move(self.data["arrow_base"], deltax, deltay)
# move the label if there is one
edge = self.data["edge"]
if edge.has_label:
self.canvas.coords(
edge.label_id,
edge.label_position(cx, cy, self.data["x1"], self.data["y1"]),
)
self.canvas.coords(edge.label_bg_id, self.canvas.bbox(edge.label_id))
# Check for being near another nodes anchor point
result = self.find_items(
cx, cy, exclude=(self.data["arrow"], self.data["arrow_base"])
)
if result is not None and result[0] == "node":
self.activate_node(result[1], result[2], exclude=(self.data["edge"].node2,))
[docs]
def activate_node(self, node, point=None, exclude=()):
"""Activate a node, i.e. display the anchor points,
unless it is in the exclusion list. Also, if the
anchor point is given, make it active.
"""
active = []
if node in exclude:
self.canvas.delete("type=active_anchor")
node.deactivate()
else:
active.append(node)
if node not in self.active_nodes:
node.activate()
self.active_nodes.append(node)
if point is None:
self.canvas.delete("type=active_anchor")
else:
node.activate_anchor_point(point, self.halo)
# deactivate any previously active nodes
for node in self.active_nodes:
if node not in active:
node.deactivate()
self.active_nodes = active
[docs]
def drop_arrow_base(self, event):
"""The user has dropped the arrow somewhere!
If it is on another node, make the connection.
If it is in empty space or on the original node,
just cancel the operation.
"""
self.canvas.bind("<B1-Motion>", "")
self.canvas.bind("<ButtonRelease-1>", "")
# Check for being near another nodes anchor point
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
result = self.find_items(cx, cy)
edge = self.data["edge"]
if result is None:
# dropped on empty space
self.canvas.delete("type=arrow_base")
self.canvas.delete("type=arrow_head")
edge.draw()
elif result[0] == "node":
# dropped on another node
node1, anchor1 = result[1:]
if edge.node1 == node1:
edge.anchor1 = anchor1
edge.draw()
else:
# remove current connection and create new, in
# that order -- otherwise tend to remove edge
# completely if it is moved on same node.
node2 = edge.node2
anchor2 = edge.anchor2
edge_subtype = edge.edge_subtype
self.remove_edge(self.data["arrow"])
self.add_edge(
node1,
node2,
edge_type="execution",
edge_subtype=edge_subtype,
anchor1=anchor1,
anchor2=anchor2,
)
self.data = None
self.mouse_op = None
[docs]
def drag_arrow_head(self, event):
"""Drag the head of an arrow"""
# move arrow
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
self.canvas.coords(self.data["arrow"], self.data["x0"], self.data["y0"], cx, cy)
# move base icon
deltax = cx - self._x0
deltay = cy - self._y0
self._x0 = cx
self._y0 = cy
self.canvas.move(self.data["arrow_head"], deltax, deltay)
# move the label if there is one
edge = self.data["edge"]
if edge.has_label:
self.canvas.coords(
edge.label_id,
edge.label_position(self.data["x0"], self.data["y0"], cx, cy),
)
self.canvas.coords(edge.label_bg_id, self.canvas.bbox(edge.label_id))
# Check for being near another nodes anchor point
result = self.find_items(
cx, cy, exclude=(self.data["arrow"], self.data["arrow_head"])
)
if result is not None and result[0] == "node":
self.activate_node(result[1], result[2], exclude=(self.data["edge"].node1,))
[docs]
def drop_arrow_head(self, event):
"""The user has dropped the arrow somewhere!
If it is on another node, make the connection.
If it is in empty space or on the original node,
just cancel the operation.
"""
self.canvas.bind("<B1-Motion>", "")
self.canvas.bind("<ButtonRelease-1>", "")
# Check for being near another nodes anchor point
cx = int(self.canvas.canvasx(event.x))
cy = int(self.canvas.canvasy(event.y))
result = self.find_items(cx, cy)
edge = self.data["edge"]
if result is None:
# dropped on empty space
self.canvas.delete("type=arrow_base")
self.canvas.delete("type=arrow_head")
edge.draw()
elif result[0] == "node":
# dropped on another node
node2, anchor2 = result[1:]
if edge.node2 == node2:
edge.anchor2 = anchor2
edge.draw()
else:
# remove current connection and create new, in
# that order -- otherwise tend to remove edge
# completely if it is moved on same node.
node1 = edge.node1
anchor1 = edge.anchor1
self.remove_edge(self.data["arrow"])
self.add_edge(
node1, node2, "execution", anchor1=anchor1, anchor2=anchor2
)
self.data = None
self.mouse_op = None
[docs]
def right_click_on_arrow(self, event, item, tags):
"""Handle a right click on an arrow"""
if self.popup_menu is not None:
self.popup_menu.destroy()
self.popup_menu = tk.Menu(self.canvas, tearoff=0)
self.popup_menu.add_command(
label="Delete", command=lambda: self.remove_edge(item)
)
self.popup_menu.tk_popup(event.x_root, event.y_root, 0)
[docs]
def remove_edge(self, item):
"""Remove an edge from the graph and visually"""
tags = self.get_tags(item)
edge = tags["edge"]
tag = edge.tag()
self.graph.remove_edge(
edge.node1, edge.node2, edge.edge_type, edge.edge_subtype
)
# Delete the tag, not item, so that we get all labels, etc.
self.canvas.delete(tag)
self.canvas.delete("type=arrow_base")
self.canvas.delete("type=arrow_head")
[docs]
def print_edges(self, event=None):
"""Print all the edges. Useful for debugging!"""
print("All edges in tk_flowchart")
for edge in self.edges():
print(
" {} {} {} {} {} {}".format(
edge.node1.tag,
edge.anchor1,
edge.node2.tag,
edge.anchor2,
edge.edge_type,
edge.edge_subtype,
)
)
[docs]
def print_items(self):
"""Print all the items on the canvas, for debugging"""
print()
for item in self.canvas.find_withtag("type=arrow"):
print("{}: {}".format(item, self.canvas.gettags(item)))
[docs]
def run(self, event=None):
"""Run the current flowchart"""
# Ensure that the flowchart is up-to-date
flowchart = self.update_flowchart()
# Do the graphical "stuff"
self._job_handler.submit_with_dialog(flowchart=flowchart)
[docs]
def push(self):
"""Save a copy of the current flowchart on the stack."""
self.update_flowchart()
self._stack.append(self.flowchart.to_dict())
[docs]
def pop(self):
"""Replace the current flowchart with the version on the stack."""
self.flowchart.from_dict(self._stack.pop())
self.from_flowchart()
[docs]
def pop_and_discard(self):
"""Remove the saved copy from the stack"""
self._stack.pop()
[docs]
def update_flowchart(self):
"""Update the non-graphical flowchart"""
wf = self.flowchart
# Make sure there is nothing in the flowchart
wf.clear(all=True)
# Add all the non-graphical nodes, making copies so that
# when the flowchart is cleared our objects still exist
translate = {}
for node in self:
translate[node] = wf.add_node(copy.copy(node.node))
node.update_flowchart()
# And the edges
for edge in self.edges():
attr = {}
for key in edge:
if key not in ("node1", "node2", "edge_type", "edge_subtype", "canvas"):
attr[key] = edge[key]
node1 = translate[edge.node1]
node2 = translate[edge.node2]
wf.add_edge(node1, node2, edge.edge_type, edge.edge_subtype, **attr)
return wf
[docs]
def from_flowchart(self):
"""Recreate the graphics from the non-graphical flowchart"""
wf = self.flowchart
self.clear()
# Add all the non-graphical nodes, making copies so that
# when the flowchart is cleared our objects still exist
translate = {}
for node in wf:
extension = node.extension
if extension is None:
# Start node
translate[node] = self.get_node("1")
else:
new_node = copy.copy(node)
logger.debug("creating {} node".format(extension))
plugin = self.plugin_manager.get(extension)
logger.debug(" plugin object: {}".format(plugin))
tk_node = plugin.create_tk_node(
tk_flowchart=self, canvas=self.canvas, node=new_node
)
translate[node] = tk_node
tk_node.from_flowchart()
self.graph.add_node(tk_node)
tk_node.draw()
# And the edges
for edge in wf.edges():
node1 = translate[edge.node1]
node2 = translate[edge.node2]
attr = {}
for key in edge:
if key not in ("node1", "node2"):
attr[key] = edge[key]
self.add_edge(node1, node2, **attr)
def _bound_to_mousewheel(self, event):
"""Set the bindings on the canvas, used when the
mouse enters the canvas
"""
self.canvas.bind_all("<MouseWheel>", self._on_mousewheel)
# self.canvas.bind_all("<Button-4>", self._on_mousewheel)
# self.canvas.bind_all("<Button-5>", self._on_mousewheel)
def _unbound_to_mousewheel(self, event):
"""Remove the bindings on the canvas, used when the
mouse leaves the canvas
"""
self.canvas.unbind_all("<MouseWheel>")
# self.canvas.unbind_all("<Button-4>")
# self.canvas.unbind_all("<Button-5>")
def _on_mousewheel(self, event):
"""Handle the mousewheel or similar events.
There are two choices for how to scroll, and it
may differ from OS to OS.
As set up here on a Mac the mouse drags the canvas in
the direction of travel, thus to go down in the canvas
you drag upwards, and vice versa.
Flip the signs to change this
"""
if event.num == 5 or event.delta < 0:
delta = 1
else:
delta = -1
self.canvas.yview_scroll(delta, "units")
x0, y0, x1, y1 = self.canvas.cget("scrollregion").split(" ")
f0, f1 = self.canvas.yview()
y = (int(y1) - int(y0)) * f0
tk._default_root.tk.call(self.canvas, "moveto", self.background, 0, y)
[docs]
def xview(self, command, amount, *args):
"""Scroll in the x direction, keeping the background picture stationary"""
self.canvas.xview(command, amount, *args)
x0, y0, x1, y1 = self.canvas.cget("scrollregion").split(" ")
f0, f1 = self.canvas.xview()
x = (int(x1) - int(x0)) * f0
f0, f1 = self.canvas.yview()
y = (int(y1) - int(y0)) * f0
tk._default_root.tk.call(self.canvas, "moveto", self.background, x, y)
[docs]
def yview(self, command, amount, *args):
"""Scroll in the y direction, keeping the background picture stationary"""
self.canvas.yview(command, amount, *args)
x0, y0, x1, y1 = self.canvas.cget("scrollregion").split(" ")
f0, f1 = self.canvas.xview()
x = (int(x1) - int(x0)) * f0
f0, f1 = self.canvas.yview()
y = (int(y1) - int(y0)) * f0
tk._default_root.tk.call(self.canvas, "moveto", self.background, x, y)
[docs]
def clean_layout(self, event=None):
"""Clean the visual layout of the flowchart"""
# clear the visited flag
for node in self:
node.node.visited = False
# get the node to start the traversal
node = self.get_node("1")
# traverse the nodes, finding what loops they are in
self._loops = {} # what loops a node is in
self._in_loop = {"start": []} # ordered list of nodes directly in a loop
loops = tuple()
self._loop_helper(loops, node)
logger.debug("\nloops\n\n{}".format(pprint.pformat(self._loops)))
logger.debug("\nin loops\n\n{}".format(pprint.pformat(self._in_loop)))
# Move the nodes to the correct place on the grid
x = 0
y = 0
self._loopxy = {"start": (0, 0)}
self._layout_nodes("start", x, y)
logger.debug("\nmax x,y\n\n{}".format(pprint.pformat(self._loopxy)))
# Fix the edges
for edge in self.edges():
# only work on edges that go upwards
x0, y0 = edge.node1.anchor_point(edge.anchor1)
x1, y1 = edge.node2.anchor_point(edge.anchor2)
logger.debug(" edge {}: {}, {} to {}, {}".format(edge, x0, y0, x1, y1))
if y1 < y0:
edge.coords = [x0, y0]
logger.debug(" edge.node1 = {}".format(edge.node1))
loop = self._loops[edge.node1][-1]
xmax, ymax = self._loopxy[loop]
xmax = (xmax + 1) * self.grid_x
ymax = (ymax + 1) * self.grid_y
dx = 10 * len(self._loops[edge.node1])
dy = 0
# Down far enough
edge.coords.append(x0)
edge.coords.append(ymax - dy)
# Right far enough
edge.coords.append(xmax - dx)
edge.coords.append(ymax - dy)
# up
edge.coords.append(xmax - dx)
edge.coords.append(y1)
# and to the node
edge.coords.append(x1)
edge.coords.append(y1)
else:
edge.coords = [x0, y0, x1, y1]
# Redraw everything
self.draw()
del self._loops
del self._in_loop
del self._loopxy
def _layout_nodes(self, loop, x, y):
"""Recursively position nodes"""
for node in self._in_loop[loop]:
x0 = int(node.x)
y0 = int(node.y)
node.x = int((x + 0.5) * self.grid_x)
node.y = int((y + 0.5) * self.grid_y)
logger.debug(
"node {} {} = {:3d} {:3d} ({:3d} {:3d}) {}".format(
x, y, int(node.x), int(node.y), x0, y0, node
)
)
xmax, ymax = self._loopxy[loop]
if x > xmax:
xmax = x
if y > ymax:
ymax = y
self._loopxy[loop] = (xmax, ymax)
if node in self._in_loop:
self._loopxy[node] = (x + 1, y)
x1, y = self._layout_nodes(node, x + 1, y)
self._loopxy[loop] = (x1, y)
else:
y += 1
return x, y
def _loop_helper(self, loops, node):
"""A helper to traverse graph finding the grid locations of the nodes"""
node.node.visited = True
self._loops[node] = loops
logger.debug("node = {}, loops = {}".format(node, loops))
if len(loops) == 0:
self._in_loop["start"].append(node)
else:
self._in_loop[loops[-1]].append(node)
if node.node_type == "loop":
loops = loops + (node,)
self._in_loop[node] = []
# nodes in the loop
for edge in self.graph.edges(node, direction="out"):
if edge.edge_type == "execution" and edge.edge_subtype == "loop":
node2 = edge.node2
if not node2.node.visited:
self._loop_helper(loops, node2)
loops = loops[0:-1]
# end exiting the loop
for edge in self.graph.edges(node, direction="out"):
if edge.edge_type == "execution" and edge.edge_subtype == "exit":
node2 = edge.node2
if not node2.node.visited:
self._loop_helper(loops, node2)
else:
for edge in self.graph.edges(node, direction="out"):
if edge.edge_type == "execution":
node2 = edge.node2
if not node2.node.visited:
self._loop_helper(loops, node2)