Source code for seamm_installer.pip

# -*- coding: utf-8 -*-
import logging
import pprint
import re
import subprocess

import requests

logger = logging.getLogger(__name__)

# Regular expressions for pypi query results.
SNIPPET_RE = re.compile(r"<a class=\"package-snippet\".*>")
NAME_RE = re.compile(r"<span class=\"package-snippet__name\">(.+)</span>")
VERSION_RE = re.compile(r".*<span class=\"package-snippet__version\">(.+)</span>")
DESCRIPTION_RE = re.compile(r".*<p class=\"package-snippet__description\">(.+)</p>")
NEXT_RE = re.compile(
    r'<a href="/search/.*page=(.+)" ' 'class="button button-group__button">Next</a>'
)


[docs] class Pip(object): """ Class for handling pip Attributes ---------- """ def __init__(self): logger.debug("Creating Pip {str(type(self))}") self._base_url = "https://pypi.org/search/"
[docs] def install(self, package): """Install the requested package. Parameters ---------- package : str The package of interest. """ if isinstance(package, list): packages = " ".join(package) command = f"pip install {packages}" else: command = f"pip install {package}" try: subprocess.check_output( command, shell=True, text=True, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError as e: logger.warning(f"Calling pip, returncode = {e.returncode}") logger.warning(f"Output: {e.output}") raise
[docs] def list(self, outdated=False, uptodate=False): """List the installed packages. Parameters ---------- outdated: bool If true, list only the outdated packages. Cannot be used with `uptodate`. uptodate: bool If true, list only the up-to-date packages. Cannot be used with `outdated`. """ command = "pip list" if outdated: if uptodate: raise ValueError("May only use one of 'outdated' and 'uptodate'.") command += " --outdated" elif uptodate: command += " --uptodate" try: output = subprocess.check_output( command, shell=True, text=True, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError as e: logger.warning(f"Calling pip, returncode = {e.returncode}") logger.warning(f"Output: {e.output}") raise result = {} for line in output.splitlines(): package, version = line.split() result[package] = version return result
[docs] def search( self, query=None, framework=None, exact=False, progress=False, newline=True, update=None, ): """Search PyPi for packages. Parameters ---------- query : str The text of the query, if any. framework : str The framework classifier, if any. exact : bool = False Whether to only return the exact match, defaults to False. progress : bool = False Whether to show progress dots. newline : bool = True Whether to print a newline at the end if showing progress update : None or method Method to call to e.g. update a progress bar Returns ------- [str] A list of packages matching the query. """ # Can not have exact match if no query term if query is None: exact = False # Set up the arguments for the http get args = {"q": query} if framework is not None: args["c"] = f"Framework::{framework}" logger.debug(f"search query: {args}") # PyPi serves up the results one page at a time, so loop if progress: count = 0 result = {} while True: response = requests.get(self._base_url, params=args) logger.log(5, f"response: {response.text}") snippets = SNIPPET_RE.split(response.text) for snippet in snippets: name = NAME_RE.findall(snippet) version = VERSION_RE.findall(snippet) description = DESCRIPTION_RE.findall(snippet) # Ignore any snippets without data, e.g. the first one. if len(name) > 0: if not exact or name[0] == query: if len(version) == 0: version = None else: version = version[0] if len(description) == 0: description = "no description given" else: description = description[0] result[name[0]] = { "channel": "pypi", "version": version, "description": description, } if exact: break if progress: if update is None: count += 1 if count <= 50: print(".", end="", flush=True) else: count = 1 print("\n.", end="", flush=True) else: update() # See if there is a next page next_page = NEXT_RE.findall(snippet) if len(next_page) == 0: break else: args["page"] = next_page[0] if progress and newline and count > 0: if update is None: print("", flush=True) logger.debug(f"Package information:\n{pprint.pformat(result)}") return result
[docs] def show(self, package): """Return the information for an installed package. Parameters ---------- package : str The package of interest. """ command = f"pip show {package}" try: result = subprocess.check_output( command, shell=True, text=True, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError as e: if "Package(s) not found:" in e.output: result = "" else: logger.warning(f"Calling pip, returncode = {e.returncode}") logger.warning(f"Output: {e.output}") raise data = {} for line in result.splitlines(): key, value = line.split(":", maxsplit=1) key = key.lower() value = value.strip() if "require" in key: value = [x.strip() for x in value.split(",")] if key == "version": data[key] = value else: data[key] = value logger.debug(f"{command}\n{pprint.pformat(data)}") return data
[docs] def uninstall(self, package): """Remove the requested package. Parameters ---------- package : str The package of interest. """ command = f"pip uninstall --yes {package}" try: output = subprocess.check_output( command, shell=True, text=True, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError as e: logger.warning(f"Calling pip, returncode = {e.returncode}") logger.warning(f"Output: {e.output}") raise logger.debug("pip uninstall -->") logger.debug(output)
[docs] def update(self, package): """Update the requested package. Parameters ---------- package : str The package of interest. """ if isinstance(package, list): packages = " ".join(package) command = f"pip install --upgrade {packages}" else: command = f"pip install --upgrade {package}" try: subprocess.check_output( command, shell=True, text=True, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError as e: line = e.output.splitlines()[-1] if "FileNotFoundError" in line: logger.warning(f"Pip returned a warning: {line}") else: logger.warning(f"Calling pip, returncode = {e.returncode}") logger.warning(f"Output: {e.output}") raise