# -*- coding: utf-8 -*-
"""Mac OS specific routines handling unique operations.
* Creating the 'app'
* Installing Launch Agents to handle the Dashboard and JobServer
"""
import datetime
import getpass
import logging
import os
from pathlib import Path
import plistlib
import shutil
import subprocess
logger = logging.getLogger(__name__)
[docs]
def create_app(
exe_path,
*args,
identifier=None,
name="SEAMM",
version="0.1.0",
user_only=False,
icons=None,
copyright=None,
):
"""Create an application bundle for a Mac app.
Parameters
----------
exe_path : pathlib.Path or str
The path to the executable (required). Either a path-like object or string
identifier : str
The bundle identifier. If None, is set to 'org.molssi.seamm.<name>'.
name : str
The name of the app
version : str = "0.1.0"
The version of the app.
user_only : bool = False
Whether to install for just the current user. Defaults to all users.
icons : pathlib.Path or string
Optional path to the icns file to use.
copyright : str
The human-readable copyright. Defaults to "Copyright 2017-xxxx MolSSI"
"""
if identifier is None:
identifier = "org.molssi.seamm." + name
if copyright is None:
year = datetime.date.today().year
copyright = f"Copyright 2017-{year} MolSSI"
if user_only:
applications_path = Path("~/Applications").expanduser()
else:
applications_path = Path("/Applications")
contents_path = applications_path / (name + ".app") / "Contents"
contents_path.mkdir(mode=0o755, parents=True, exist_ok=True)
# Create the script to run the executable
macos_path = contents_path / "MacOS"
macos_path.mkdir(mode=0o755, parents=False, exist_ok=True)
script_path = macos_path / name
path = Path(exe_path).expanduser().resolve()
cmd = '"' + str(path) + '"'
for arg in args:
cmd += f" {arg}"
script_path.write_text(f"#!/bin/bash\n{cmd}\n")
script_path.chmod(0o755)
# And put the icons in place
resources_path = contents_path / "Resources"
resources_path.mkdir(mode=0o755, parents=False, exist_ok=True)
icons_path = resources_path / (name + ".icns")
path = Path(icons).expanduser().resolve()
shutil.copyfile(path, icons_path)
# write the PList file describing the app.
data = {
"CFBundleIdentifier": identifier,
"CFBundleName": name,
"CFBundleShortVersionString": version,
"CFBundleExecutable": name,
"CFBundleIconFile": icons_path.name,
"CFBundleDevelopmentRegion": "en",
"CFBundlePackageType": "APPL",
"LSApplicationCategoryType": "public.app-category.education",
"NSHumanReadableCopyright": copyright,
}
plist_path = contents_path / "Info.plist"
with plist_path.open(mode="wb") as fd:
plistlib.dump(data, fd)
[docs]
def delete_app(name, missing_ok=False):
"""Delete the app given.
Parameters
----------
name : str
The name of the app.
missing_ok : bool = False
Don't throw an error if the app does not exist.
"""
apps = get_apps()
if name in apps:
shutil.rmtree(apps[name])
elif not missing_ok:
raise FileNotFoundError(f"App '{name}' does not exist.")
[docs]
def get_apps():
paths = (
Path("~/Applications").expanduser(),
Path("/Applications"),
)
apps = {}
for path in paths:
for file_path in path.glob("*.app"):
name = file_path.stem
apps[name] = file_path
return apps
[docs]
def update_app(name, version, missing_ok=False):
"""Update the version for a Mac app.
Parameters
----------
name : str
The name of the app
version : str
The version of the app.
missing_ok : bool = False
Don't throw an error if the app does not exist.
"""
apps = get_apps()
if name in apps:
app_path = Path(apps[name])
contents_path = app_path / "Contents"
plist_path = contents_path / "Info.plist"
with plist_path.open(mode="rb") as fd:
data = plistlib.load(fd)
data["CFBundleShortVersionString"] = version
with plist_path.open(mode="wb") as fd:
plistlib.dump(data, fd)
elif not missing_ok:
raise FileNotFoundError(f"App '{name}' does not exist.")
[docs]
class ServiceManager:
def __init__(self, prefix=""):
"""A manager for handling services (agents) on MacOS.
Parameters
----------
prefix : str
The prefix for all services, limiting searches, etc.
"""
self.prefix = prefix
self._data = None # Dictionary of existing services
self._uid = os.getuid()
self._paths = (
(Path("~/Library/LaunchAgents").expanduser(), f"gui/{self.uid}"),
(Path("/Library/LaunchAgents"), f"gui/{self.uid}"),
(Path("/Library/LaunchDaemons"), "system"),
)
@property
def data(self):
if self._data is None:
self._data = {}
pattern = self.prefix + ".*"
for path, domain in self.paths:
for file_path in path.glob(pattern):
name = file_path.stem
target = f"{domain}/{name}"
short_name = file_path.suffixes[-2][1:]
self._data[short_name] = (domain, target, file_path)
return self._data
@property
def paths(self):
return self._paths
@property
def uid(self):
return self._uid
[docs]
def create(
self,
name,
exe_path,
*args,
user_agent=True,
user_only=True,
stderr_path=None,
stdout_path=None,
exist_ok=False,
):
"""Create a service on MacOS.
The Mac supports three types of services. This function uses `user_agent` and
`user_only` to control which is selected.
1. A user Launch Agent for a single user, which runs while that user is
logged in. (True, True)
2. A Launch Agent installed by the admin that is available for all users,
and runs when any user is logged in. (True, False)
3. A system-wide service that runs when the machine is booted. (False, not
used)
Parameters
----------
name : str
The name of the agent
exe_path : pathlib.Path or str
The path to the executable (required). Either a path-like object or string
args : []
List of arguments for the program.
user_agent : bool = True
Whether to create a per-user agent (True) or system-wide daemon (False)
user_only : bool = True
Whether to install for just the current user (True) or all users (False).
Only affects user agents, not daemons which are always system-wide.
stderr_path : pathlib.Path or str = None
The file to direct stderr. Defaults to "~/SEAMM/logs/<name>.out"
stdout_path : pathlib.Path or str = None
The file to direct stdout. Defaults to "~/SEAMM/logs/<name>.out"
exist_ok : bool = False
If True overwrite an existing file.
"""
identifier = self.prefix + "." + name
if user_agent:
if user_only:
launchd_path = self.paths[0][0]
plist_path = launchd_path / f"{identifier}.plist"
else:
launchd_path = self.paths[1][0]
plist_path = launchd_path / f"{identifier}.plist"
else:
launchd_path = self.paths[2][0]
plist_path = launchd_path / f"{identifier}.plist"
if plist_path.exists():
if not exist_ok:
raise FileExistsError()
if stderr_path is None:
stderr_path = Path(f"~/SEAMM/logs/{name}.out").expanduser()
if stdout_path is None:
stdout_path = Path(f"~/SEAMM/logs/{name}.out").expanduser()
# And the plist file itself.
program_arguments = [str(exe_path)]
for arg in args:
program_arguments.append(str(arg))
plist = {
"Label": identifier,
"KeepAlive": True,
"ProgramArguments": program_arguments,
"ProcessType": "Interactive",
"StandardErrorPath": str(stderr_path),
"StandardOutPath": str(stdout_path),
}
# System-wide daemons need the username
if not user_agent:
username = getpass.getuser()
plist["UserName"] = username
# Reset the service data so it is re-read
self._data = None
# Write the file ... we may not have permission, so catch that.
try:
launchd_path.mkdir(parents=True, exist_ok=True)
with plist_path.open(mode="wb") as fd:
plistlib.dump(plist, fd)
except PermissionError:
path = Path("~/Downloads").expanduser() / f"{identifier}.plist"
with path.open(mode="wb") as fd:
plistlib.dump(plist, fd)
print(f"\nYou do not have permission to write to {launchd_path}.")
print("If you have administrator access, run the following commands:")
print("")
print(f" sudo mv {path} {plist_path}")
print(
" sudo chown root:wheel /Library/LaunchDaemons/org.molssi.seamm"
".dashboard.plist"
)
print("")
print("To move the temporary copy of the file to the correct locations.")
print("Then start the services as follows:")
print("")
except Exception as e:
print("Caught error?")
print(e)
print()
raise
[docs]
def delete(self, service, ignore_errors=False):
services = self.list()
if service in services:
domain, service_target, path = self.data[service]
# Check if it is running
if self.is_running(service):
self.stop(service)
# Now remove the files
path.unlink(missing_ok=True)
# Fix up service data
del self.data[service]
else:
# Check if the plist file exists, and remove if it does.
for path, _ in self.paths:
launchd_path = path / f"{self.prefix}.{service}.plist"
launchd_path.unlink(missing_ok=True)
[docs]
def file_path(self, service):
"Return the path to the plist file for the service."
data = self.data
if service in data:
return data[service][2]
return ""
[docs]
def is_installed(self, service):
return service in self.list()
[docs]
def is_running(self, service):
result = False
services = self.list()
if service in services:
service_target = self.data[service][1]
cmd = f"launchctl print {service_target}"
result = subprocess.run(cmd, shell=True, text=True, capture_output=True)
if result.returncode == 0:
result = True
else:
result = False
else:
result = False
return result
[docs]
def list(self):
return self.data.keys()
[docs]
def restart(self, service, ignore_errors=False):
self.stop(service, ignore_errors=ignore_errors)
self.start(service, ignore_errors=ignore_errors)
[docs]
def start(self, service, ignore_errors=False):
if not self.is_running(service):
services = self.list()
if service in services:
domain, service_target, path = self.data[service]
cmd = f"launchctl bootstrap {domain} {path}"
result = subprocess.run(cmd, shell=True, text=True, capture_output=True)
if result.returncode != 0 and not ignore_errors:
raise RuntimeError(
f"Starting the service '{service}' was not successful:\n"
f"{result.stderr}"
)
elif not ignore_errors:
raise RuntimeError(
f"Service '{service}' cannot be started because it is not installed"
)
[docs]
def status(self, service):
status = {"service": service}
services = self.list()
if service in services:
status["exists"] = True
service_target = self.data[service][1]
cmd = f"launchctl print {service_target}"
result = subprocess.run(cmd, shell=True, text=True, capture_output=True)
status["running"] = result.returncode == 0
# Get the root directory and, for the dashboard, port
path = self.data[service][2]
logger.debug(f"Checking {path} for the root and port")
with path.open(mode="rb") as fd:
data = plistlib.load(fd)
root = None
port = None
name = None
if "ProgramArguments" in data:
lines = iter(data["ProgramArguments"])
for line in lines:
if "--root" in line:
root = next(lines)
if "--port" in line:
port = next(lines)
if "--dashboard-name" in line:
name = next(lines)
status["root"] = root
status["port"] = port
status["dashboard name"] = name
else:
status["exists"] = False
return status
[docs]
def stop(self, service, ignore_errors=False):
services = self.list()
if service in services:
domain, service_target, path = self.data[service]
# Check if it is running
if self.is_running(service):
cmd = f"launchctl bootout {service_target}"
result = subprocess.run(cmd, shell=True, text=True, capture_output=True)
if result.returncode == 0:
pass
elif not ignore_errors:
raise RuntimeError(f"Could not stop the service '{service}':")