# -*- coding: utf-8 -*-
"""The JobServer for the SEAMM environment.
"""
import collections.abc
from datetime import datetime, timezone
import json
import logging
import os
from pathlib import Path
import psutil
import shutil
import sqlite3
import subprocess
import sys
import time
import traceback
import seamm_jobserver
import seamm_util
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
[docs]
def run():
"""Helper routine to run the JobServer from the command-line"""
jobserver = seamm_jobserver.JobServer()
jobserver.start()
[docs]
def humanize(memory, suffix="B", kilo=1024):
"""
Scale memory to its proper format e.g:
1253656 => '1.20 MiB'
1253656678 => '1.17 GiB'
"""
if kilo == 1000:
units = ["", "k", "M", "G", "T", "P"]
elif kilo == 1024:
units = ["", "Ki", "Mi", "Gi", "Ti", "Pi"]
else:
raise ValueError("kilo must be 1000 or 1024!")
for unit in units:
if memory < kilo:
return f"{memory:.2f} {unit}{suffix}"
memory /= kilo
[docs]
class TkTextHandler(logging.StreamHandler):
def __init__(self, widget):
super().__init__()
self.text = widget
[docs]
def emit(self, record):
msg = self.format(record)
self.text.insert("end", msg)
self.text.insert("end", "\n")
[docs]
class JobServer(collections.abc.MutableMapping):
def __init__(self, logger=logger):
"""Initialize the instance
Parameters
----------
check_interval : integer
Number of seconds between checks for new jobs in the database
"""
super().__init__()
self.check_interval = 1
self.status_interval = 5
self.logger = logger
self.options = None
self.seamm_options = None
self.stop = False
self.total_jobs = 0
self.previous_jobs = 0
self.successful_jobs = 0
self.ended_jobs = 0
self.failed_jobs = 0
self._db = None
self._db_path = None
self._tasks = set()
self._jobs = {}
self._tk_root = None
self._after_id = None
self._status_id = None
self._widget = {}
self._times = {"JobServer": {}}
# Provide dict like access to the widgets to make the code cleaner
def __getitem__(self, key):
"""Allow [] access to the widgets."""
return self._widget[key]
def __setitem__(self, key, value):
"""Allow [key] access to set a widget."""
self._widget[key] = value
def __delitem__(self, key):
"""Allow deletion of widgets."""
if key in self._widget:
self._widget[key].destroy()
del self._widget[key]
def __iter__(self):
"""Allow iteration over the widgets"""
return iter(self._widget)
def __len__(self):
"""Provide the nmber of widgets, for e.g. len() command."""
return len(self._widget)
@property
def db_path(self):
return self._db_path
@db_path.setter
def db_path(self, value):
if value != self._db_path:
# Close any connection to the database
if self._db is not None:
self._db.close()
self._db = None
if value is not None:
self.logger.info(f"Opening the database '{value}'")
self._db = sqlite3.connect(value)
self._db_path = value
@property
def db(self):
return self._db
[docs]
def check_for_finished_jobs(self):
"""Check whether jobs have finished."""
finished = []
for job_id, data in self._jobs.items():
pid = data["pid"]
process = data["process"]
try:
is_running = process.is_running()
if process.status() == psutil.STATUS_ZOMBIE:
is_running = False
except Exception:
is_running = False
if is_running:
self.logger.debug(f"Job {job_id} is running as process {pid}")
else:
finished.append(job_id)
try:
status = process.returncode
except Exception:
status = "unknown"
self.logger.debug(f"Job {job_id} finished, code={status}.")
if status is None or status == 0:
self.logger.info(f"Job {job_id} finished successfully ({pid=}).")
self.successful_jobs += 1
elif status == "unknown":
self.logger.info(
f"Job {job_id} finished with unknown status ({pid=})."
)
self.ended_jobs += 1
else:
self.logger.info(f"Job {job_id} failed ({pid=} {status=}).")
self.failed_jobs += 1
for job_id in finished:
del self._jobs[job_id]
del self._times[job_id]
[docs]
def check_for_new_jobs(self):
"""Check the database for new jobs that are runnable."""
cursor = self.db.cursor()
self.logger.debug("Checking jobs in datastore")
cursor.execute(
"SELECT id, path, json_extract(parameters, '$.cmdline')"
" FROM jobs"
" WHERE status = 'submitted'"
)
while True:
result = cursor.fetchone()
if result is None:
break
job_id, path, cmdline = result
cmdline = json.loads(cmdline)
pid = self.start_job(job_id, path, cmdline)
current_time = datetime.now(timezone.utc)
cursor = self.db.cursor()
cursor.execute(
"UPDATE jobs"
" SET status='running', started = ?,"
" parameters=json_set(jobs.parameters, '$.pid', ?)"
" WHERE id = ?",
(current_time, pid, job_id),
)
self.db.commit()
self.logger.info(f"Started job {job_id} with pid={pid}, path={path}")
[docs]
def gui_create(self):
"""Create the tkinter GUI."""
import tkinter as tk
import tkinter.ttk as ttk
from tkinter.scrolledtext import ScrolledText
# Initialize Tk
self._tk_root = tk.Tk()
self._tk_root.protocol("WM_DELETE_WINDOW", self.gui_on_closing)
app_name = f"JobServer {self.seamm_options['root']}"
self._tk_root.title(app_name)
# The menus
menu = tk.Menu(self._tk_root)
# Set the about and preferences menu items on Mac
if sys.platform.startswith("darwin"):
app_menu = tk.Menu(menu, name="apple")
menu.add_cascade(menu=app_menu)
app_menu.add_command(label="About " + app_name, command=self.gui_about)
app_menu.add_separator()
self._tk_root.createcommand(
"tk::mac::ShowPreferences", self.gui_preferences
)
# self._tk_root.createcommand(
# "tk::mac::OpenDocument", tk_flowchart.open_file
# )
self.CmdKey = "Command-"
else:
self.CmdKey = "Control-"
notebook = ttk.Notebook(self._tk_root)
notebook.grid(row=0, column=0, columnspan=2, sticky=tk.NSEW)
self["notebook"] = notebook
self._tk_root.rowconfigure(0, weight=1)
self._tk_root.columnconfigure(0, weight=1)
# Button for exiting the JobServer
w = ttk.Button(self._tk_root, text="Exit", command=self.gui_on_closing)
w.grid(row=1, column=1, sticky=tk.E)
# Tab for the log
self["log frame"] = frame = ttk.Frame(notebook)
notebook.add(frame, text="Log", sticky=tk.NSEW)
# Add a scrolled text area for logging
self["log"] = log = ScrolledText(frame, wrap=tk.WORD, font=("TkFixedFont",))
log.grid(row=0, column=0, columnspan=2, sticky=tk.NSEW)
frame.rowconfigure(0, weight=1)
frame.columnconfigure(0, weight=1)
w = ttk.Button(frame, text="Clear", command=lambda: log.delete("1.0", "end"))
w.grid(row=1, column=0, sticky=tk.W)
# Insert the initial log info into the widget
logfile = self.options["log_file"]
self["log"].insert("end", f"The JobServer is starting in {Path.cwd()}\n")
self["log"].insert(
"end", f" version = {seamm_jobserver.__version__}\n"
)
self["log"].insert("end", f" datastore = {self.db_path}\n")
self["log"].insert("end", f" check interval = {self.check_interval}\n")
self["log"].insert("end", f" log file = {logfile}\n")
if not self.options["no_windows"]:
self["log"].insert("end", "Using the GUI.\n")
if len(self._ini_files) == 0:
self["log"].insert("end", "No .ini files were used\n")
else:
self["log"].insert("end", "The following .ini files were used:\n")
for filename in self._ini_files:
self["log"].insert("end", f" {filename}\n")
self["log"].insert("end", "\n")
# And set up logging to echo to the log widget
th = TkTextHandler(self["log"])
formatter = logging.Formatter("%(message)s")
th.setFormatter(formatter)
th.setLevel(logging.INFO)
self.logger.addHandler(th)
# Tab for the status
self["status frame"] = frame = ttk.Frame(notebook)
notebook.add(frame, text="Status", sticky=tk.NSEW)
# Add a scrolled text area for logging
self["status"] = w = ScrolledText(frame, wrap=tk.WORD, font=("TkFixedFont",))
w.grid(row=0, column=0, columnspan=2, sticky=tk.NSEW)
frame.rowconfigure(0, weight=1)
frame.columnconfigure(0, weight=1)
# Fill the screen
sw = self._tk_root.winfo_screenwidth()
sh = self._tk_root.winfo_screenheight()
width = int(0.9 * sw) - 50
height = int(0.9 * sh) - 50
x = int(0.1 * sw / 2)
y = int(0.1 * sh / 2)
self._tk_root.geometry(f"{width}x{height}+{x}+{y}")
[docs]
def gui_about(self):
"""Provide information about the JobServer."""
from tkinter import messagebox
messagebox.showinfo(
"About SEAMM JobServer", f"SEAMM JobServer v{seamm_jobserver.__version__}"
)
[docs]
def gui_event_loop(self):
"""The callback for the main loop when using Tk"""
if self.stop:
self._tk_root.quit()
try:
self.check_for_finished_jobs()
self.check_for_new_jobs()
except Exception as e:
self.logger.error(f"Error: {e}\n\n{traceback.format_exc()}")
self._after_id = self._tk_root.after(
int(self.check_interval * 1000), self.gui_event_loop
)
[docs]
def gui_on_closing(self):
"""Check that the user wants to stop the JobServer, and do so"""
from tkinter import messagebox
if messagebox.askyesno("Exit", "Do you want to exit the JobServer?"):
self.stop = True
[docs]
def gui_preferences(self):
"""Provide access to the preferences for the JobServer."""
from tkinter import messagebox
messagebox.showinfo(
"SEAMM JobServer Preferences", "Currently there are no preferences"
)
[docs]
def gui_status(self, status):
"""Display the current load and jobs."""
text = self["status"]
text.delete("1.0", "end")
text.insert("end", f"Status at {status['time']}\n\n")
text.insert("end", f"Jobs previously running: {status['previous jobs']:4d}\n")
text.insert("end", f" started: {status['total jobs']:4d}\n")
text.insert("end", f" completed successfully: {status['successful jobs']:4d}\n")
text.insert("end", f" failed: {status['failed jobs']:4d}\n")
text.insert("end", f" unknown status: {status['ended jobs']:4d}\n\n")
# Cpu usage on machine
text.insert("end", f" User time: {status['machine user time']:10.1f}\n")
text.insert("end", f"System time: {status['machine system time']:10.1f}\n")
text.insert("end", f" Idle time: {status['machine idle time']:10.1f}\n")
text.insert("end", f" CPU %: {status['machine % cpu']:10.1f}\n")
available = status["available memory"]
pct = status["memory % used"]
total = status["total memory"]
text.insert("end", f" Memory available: {available} {pct:5.1f}%")
text.insert("end", f" total: {total}")
text.insert("end", "\n\n")
# The jobserver itself
if "JobServer" in status:
js = status["JobServer"]
cpu_percent = js["cpu %"]
cpu = js["cpu time"]
rss = js["resident memory"]
memory_percent = js["memory %"]
text.insert(
"end",
f"JobServer: cpu {cpu_percent:.1f}% {cpu:.1f} "
f"memory {rss} {memory_percent:.1f}%\n",
)
for job_id in sorted(status["Jobs"].keys()):
js = status["Jobs"][job_id]
memory_percent = js["memory %"]
rss = js["resident memory"]
cpu_percent = js["cpu %"]
cpu = js["cpu time"]
text.insert(
"end",
f"\n{job_id}: cpu {cpu_percent:.1f}% {cpu:.1f} "
f"memory {rss} {memory_percent:.1f}%\n",
)
if "sub processes" in js:
for pid in sorted(js["sub processes"].keys()):
sub = js["sub processes"][pid]
memory_percent = sub["memory %"]
rss = sub["resident memory"]
cpu_percent = sub["cpu %"]
cpu = sub["cpu time"]
name = sub["name"]
text.insert(
"end",
f" {pid}: {name} cpu {cpu_percent:.1f}% "
f"{cpu:.1f} memory {rss} {memory_percent:.1f}%\n",
)
[docs]
def gui_status_loop(self):
"""The callback for the the status."""
try:
status = self.status()
status_file = self.options["status_file"]
if status_file != "none":
status_file = Path(status_file).expanduser()
with open(status_file, "w") as fd:
json.dump(status, fd, indent=4)
self.gui_status(status)
except Exception as e:
self.logger.error(f"Error: {e}\n\n{traceback.format_exc()}")
self._status_id = self._tk_root.after(
int(self.status_interval * 1000), self.gui_status_loop
)
[docs]
def initialize(self):
"""Parse the command-line and setup the JobServer"""
parser = self.setup_parser()
parser.parse_args()
self.options = parser.get_options("JobServer")
self.seamm_options = parser.get_options("SEAMM")
# Make sure the logs folder exists (avoid FileNotFoundError)
logfile = Path(self.options["log_file"]).expanduser()
# Set the logging level for the JobServer itself
logger.setLevel(self.options["log_level"])
# create file handler
fh = logging.FileHandler(logfile)
formatter = logging.Formatter("%(asctime)s - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
# add the handlers to the logger
self.logger.addHandler(fh)
# Where is the datastore?
datastore = Path(self.seamm_options["datastore"]).expanduser()
# Get the database file / instance
db_path = datastore / "seamm.db"
self.check_interval = self.options["check_interval"]
# Log how we are starting
self._ini_files = parser.get_ini_files()
logger.info(f"The JobServer is starting in {Path.cwd()}")
logger.info(f" version = {seamm_jobserver.__version__}")
logger.info(f" datastore = {db_path}")
logger.info(f" check interval = {self.check_interval}")
logger.info(f" log file = {logfile}")
if not self.options["no_windows"]:
logger.info("Using the GUI.")
if len(self._ini_files) == 0:
logger.info("No .ini files were used")
else:
logger.info("The following .ini files were used:")
for filename in self._ini_files:
logger.info(f" {filename}")
logger.info("")
# And create the GUI if needed.
if not self.options["no_windows"]:
self.gui_create()
# Open the database
self.db_path = db_path
[docs]
def setup_parser(self):
"""Setup the command-line parser."""
parser = seamm_util.seamm_parser("JobServer")
parser.add_parser("JobServer")
parser.add_argument(
"SEAMM",
"--version",
action="version",
version=f"JobServer version {seamm_jobserver.__version__}",
)
parser.add_argument(
"JobServer",
"--log-level",
default="INFO",
type=str.upper,
choices=["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help=(
"The level of informational output for jobs, defaults to '%(default)s'"
),
)
parser.add_argument(
"JobServer",
"--no-windows",
"-nw",
action="store_true",
help="Don't use a graphical interface.",
)
parser.add_argument(
"JobServer",
"--check-interval",
type=float,
default=1.0,
action="store",
help="The interval for checking for new jobs.",
)
parser.add_argument(
"JobServer",
"--log-file",
default="${SEAMM:root}/logs/jobserver.log",
action="store",
help="Where to save the logs.",
)
parser.add_argument(
"JobServer",
"--status-file",
default="${SEAMM:root}/logs/jobserver_status.json",
action="store",
help="Where to save the JSON of the status.",
)
return parser
[docs]
def start(self):
"""Start the main event loop."""
if self.options is None:
self.initialize()
# Find any jobs already running
for row in self.db.execute(
"SELECT id, json_extract(parameters, '$.pid')"
" FROM jobs"
" WHERE status = 'running'"
):
job_id, pid = row
if pid is None:
finished = True
else:
finished = False
try:
process = psutil.Process(pid=pid)
except psutil.NoSuchProcess:
finished = True
pass
else:
if process.is_running():
self._jobs[job_id] = {"pid": process.pid, "process": process}
self._times[job_id] = {}
else:
finished = True
if finished:
self.logger.info(f"Job {job_id} already finished (pid={pid}).")
try:
current_time = datetime.now(timezone.utc)
cursor = self.db.cursor()
cursor.execute(
"UPDATE jobs"
" SET status = 'finished', finished = ?,"
" parameters=json_remove(jobs.parameters, '$.pid')"
" WHERE id = ?",
(current_time, job_id),
)
self.db.commit()
except Exception as e:
self.logger.warning(f"Could not update job {job_id}: {e}")
else:
self.previous_jobs += 1
self.logger.info(f"Job {job_id} is still running (pid={pid}).")
if self._tk_root is not None:
self._after_id = self._tk_root.after(10, self.gui_event_loop)
self._status_id = self._tk_root.after(int(1000), self.gui_status_loop)
self._tk_root.mainloop()
else:
while not self.stop:
# If nothing to do sleep and then check for new jobs
if len(self._tasks) == 0:
time.sleep(self.check_interval)
else:
pass
try:
self.check_for_finished_jobs()
self.check_for_new_jobs()
status = self.status()
status_file = self.options["status_file"]
if status_file != "none":
status_file = Path(status_file).expanduser()
with open(status_file, "w") as fd:
json.dump(status, fd, indent=4)
except Exception as e:
print(f"Error: {e}\n\n{traceback.format_exc()}")
logger.info("Stopping the JobServer and closing the database.")
self._db.close()
logger.info("Good bye!")
[docs]
def start_job(self, job_id, wdir, cmdline=""):
"""Run the given job.
Parameters
----------
job_id : integer
The id of the job to run.
"""
self.logger.info("Starting job {}".format(job_id))
path = sys.executable
if path is not None and path != "":
exe = Path(path).parent / "run_from_jobserver"
else:
exe = shutil.which("run_from_jobserver")
cmd = [str(exe)]
cmd.append(str(job_id))
cmd.append(str(wdir))
cmd.append(str(self.db_path))
# Check if in docker container
cgroup = Path("/proc/self/cgroup")
if (
Path("/.dockerenv").is_file()
or cgroup.is_file()
and "docker" in cgroup.read_text()
):
cmd.append("--executor")
cmd.append("docker")
# Environment variable for debug output
if "SEAMM_LOG_LEVEL" in os.environ:
cmd.append("--log-level")
cmd.append(os.environ["SEAMM_LOG_LEVEL"])
cmd.extend(cmdline)
process = psutil.Popen(
cmd,
cwd=wdir,
close_fds=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
self._jobs[job_id] = {"pid": process.pid, "process": process}
self._times[job_id] = {}
self.logger.debug(f" process = {process}")
self.total_jobs += 1
return process.pid
[docs]
def status(self):
"""Get the current load, etc."""
status = {}
t_now = datetime.now()
status["time"] = f"{t_now:%H:%M:%S}"
status["previous jobs"] = self.previous_jobs
status["total jobs"] = self.total_jobs
status["successful jobs"] = self.successful_jobs
status["failed jobs"] = self.failed_jobs
status["ended jobs"] = self.ended_jobs
# Cpu usage on machine
times = psutil.cpu_times()
status["machine user time"] = round(times.user, 1)
status["machine system time"] = round(times.system, 1)
status["machine idle time"] = round(times.idle, 1)
status["machine % cpu"] = round(psutil.cpu_percent(interval=None), 1)
memory = psutil.virtual_memory()
total = humanize(memory.total)
available = humanize(memory.available)
pct = 100 * memory.available / memory.total
status["available memory"] = available
status["total memory"] = total
status["memory % used"] = round(pct, 1)
# The jobserver itself
job_id = "JobServer"
t = self._times[job_id]
process = psutil.Process()
pid = process.pid
if pid not in t:
t[pid] = {
"user": 0.0,
"system": 0.0,
"time": time.perf_counter(),
}
else:
tpid = t[pid]
if process.is_running():
# Still running!
with process.oneshot():
cpu = process.cpu_times()
memory = process.memory_info()
memory_percent = process.memory_percent()
memory_percent = float(memory_percent)
user = float(cpu.user)
system = float(cpu.system)
rss = humanize(memory.rss)
t1 = time.perf_counter()
delta_t = t1 - tpid["time"]
pct_user = (user - tpid["user"]) / delta_t * 100.0
pct_system = (system - tpid["system"]) / delta_t * 100.0
cpu_percent = pct_user + pct_system
cpu = user + system
tpid["time"] = t1
tpid["user"] = user
tpid["system"] = system
status["JobServer"] = {
"cpu %": round(cpu_percent, 1),
"cpu time": round(cpu, 1),
"resident memory": rss,
"memory %": round(memory_percent, 1),
}
js = status["Jobs"] = {}
for job_id, data in self._jobs.items():
pid = data["pid"]
if job_id not in self._times:
self._times[job_id] = {}
t = self._times[job_id]
if pid not in t:
t[pid] = {
"user": 0.0,
"system": 0.0,
"time": time.perf_counter(),
}
else:
tpid = t[pid]
process = data["process"]
try:
if process.is_running():
# Still running!
with process.oneshot():
cpu = process.cpu_times()
memory = process.memory_info()
memory_percent = process.memory_percent()
memory_percent = float(memory_percent)
user = float(cpu.user)
system = float(cpu.system)
rss = humanize(memory.rss)
t1 = time.perf_counter()
delta_t = t1 - tpid["time"]
pct_user = (user - tpid["user"]) / delta_t * 100.0
pct_system = (system - tpid["system"]) / delta_t * 100.0
cpu_percent = pct_user + pct_system
cpu = user + system
tpid["time"] = t1
tpid["user"] = user
tpid["system"] = system
js[job_id] = {
"cpu %": round(cpu_percent, 1),
"cpu time": round(cpu, 1),
"resident memory": rss,
"memory %": round(memory_percent, 1),
}
sub = js[job_id]["sub processes"] = {}
for p in process.children(recursive=True):
with p.oneshot():
pid = p.pid
name = p.name()
cpu = p.cpu_times()
memory = p.memory_info()
memory_percent = p.memory_percent()
if pid not in t:
t[pid] = {
"user": 0.0,
"system": 0.0,
"time": time.perf_counter(),
}
else:
tpid = t[pid]
memory_percent = float(memory_percent)
user = float(cpu.user)
system = float(cpu.system)
rss = humanize(memory.rss)
t1 = time.perf_counter()
delta_t = t1 - tpid["time"]
pct_user = (user - tpid["user"]) / delta_t * 100.0
pct_system = (system - tpid["system"]) / delta_t * 100.0
cpu_percent = pct_user + pct_system
cpu = user + system
tpid["time"] = t1
tpid["user"] = user
tpid["system"] = system
sub[pid] = {
"name": name,
"cpu %": round(cpu_percent, 1),
"cpu time": round(cpu, 1),
"resident memory": rss,
"memory %": round(memory_percent, 1),
}
except psutil.NoSuchProcess:
pass
except Exception as e:
self.logger.warning(
f"Warning getting status of {job_id}: {e}\n\n"
f"{traceback.format_exc()}"
)
return status
if __name__ == "__main__":
run()