Source code for molsystem.properties

# -*- coding: utf-8 -*-

import csv
import json
import logging
from pathlib import Path
import pkg_resources

logger = logging.getLogger(__name__)
standard_properties = {}


[docs] def add_properties_from_file(path): """The standard properties recognized by SEAMM. These are officially defined properties that can be used anywhere in SEAMM, as long as the type and definition correspond to the standard. Each property is defined by a string with up to three parts: <property name>#<code or 'experiment'>#<technique or model chemistry> The property name is required. In most cases this is followed by either 'experiment' or the name of the code, e.g. 'MOPAC', 'Gaussian', or 'VASP'. The final part, if present, is either the experimental technique used to measure the property, or the model chemistry, such as 'MP2/6-31G**', 'PM7', or a forcefield name such as 'AMBER/ff19SB'. You can create other properties on the fly, but they follow the above convention and should have an appropriate code and, if necessary, model chemistry, so that they full name is unique and does not conflict with any other defined name. For example, the standard property "enthalpy of formation" refers to the experimental heat of formation, or a calculated value comparable to experimental values. If you are not sure what the heat of formation in e.g. MOPAC is, you could create a new property "enthalpy of formation#MOPAC#<parameterization>", which is clearly similar to the standard "enthalpy of formation". If the community decides that it is indeed the same, it can be replaced by the standard form, and also aliased to it for backwards compatibility. """ with open(path, newline="", encoding="utf-8-sig") as fd: data = csv.reader(fd) line = 0 for row in data: line += 1 if line == 1: # Headers headers = [*row] if headers != [ "Property", "Type", "Units", "Description", "URL", ]: raise ValueError( "Header of standard properties file not valid: " + ", ".join(headers) ) else: property = row[0] data = standard_properties[property] = {} for key, value in zip(headers[1:], row[1:]): data[key] = value
path = Path(pkg_resources.resource_filename(__name__, "data/")) csv_file = path / "standard_properties.csv" add_properties_from_file(csv_file) class _Properties(object): """A class for handling the properties in the star schema.""" def __init__(self, system_db): self._system_db = system_db self._standard_properties = None @property def cursor(self): return self.system_db.cursor @property def db(self): return self.system_db.db @property def standard_properties(self): return standard_properties @property def system_db(self): """Return the SystemDB object.""" return self._system_db def add(self, name, _type="float", units=None, description="", noerror=False): """Add a property to the database. By default, it is an error if it already exists! Parameters ---------- name : str The name of the property. _type : str = "float" The type of the property -- float, int, str. Default is "float". units : str = None The units of the property value, as a string that Pint can interpret. description : str A longer, text description of the property. noerror: bool = False Whether to quietly ignore an existing entry Returns ------- int The database id of the property. Note ---- If the property is registered in the list of standard properties, just give the name and the rest will be filled in from the standard property metadata. """ if self.exists(name): if noerror: return self.id(name) else: raise ValueError(f"Property '{name}' already exists.") table = self.system_db["property"] if name in self.standard_properties: data = self.standard_properties[name] result = table.append( name=name, type=data["Type"], units=data["Units"], description=data["Description"], ) else: result = table.append( name=name, type=_type, units=units, description=description, ) return result[0] def create_schema(self): """Add the needed tables to the database.""" # The property dimension table table = self.system_db["property"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("name", coltype="str", index="unique") table.add_attribute("type", coltype="str") table.add_attribute("units", coltype="str") table.add_attribute("description", coltype="str") # Floating point facts table = self.system_db["float_data"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("configuration", coltype="int", references="configuration") table.add_attribute("system", coltype="int", references="system") table.add_attribute("property", coltype="int", references="property") table.add_attribute("value", coltype="float") self.db.execute( "CREATE INDEX float_data_idx_configuration_property_value" " ON float_data(configuration, property, value)" ) self.db.execute( "CREATE INDEX float_data_idx_system_property_value" " ON float_data(system, property, value)" ) self.db.execute( "CREATE INDEX float_data_idx_property_value" " ON float_data(property, value)" ) self.db.execute( "CREATE INDEX float_data_idx_configuration_property" " ON float_data(configuration, property)" ) self.db.execute( "CREATE INDEX float_data_idx_system_property" " ON float_data(system, property)" ) self.db.execute( "CREATE INDEX float_data_idx_configuration ON float_data(configuration)" ) self.db.execute("CREATE INDEX float_data_idx_system ON float_data(system)") # Integer facts table = self.system_db["int_data"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("configuration", coltype="int", references="configuration") table.add_attribute("system", coltype="int", references="system") table.add_attribute("property", coltype="int", references="property") table.add_attribute("value", coltype="int") self.db.execute( "CREATE INDEX int_data_idx_configuration_property_value" " ON int_data(configuration, property, value)" ) self.db.execute( "CREATE INDEX int_data_idx_system_property_value" " ON int_data(system, property, value)" ) self.db.execute( "CREATE INDEX int_data_idx_property_value" " ON int_data(property, value)" ) self.db.execute( "CREATE INDEX int_data_idx_configuration_property" " ON int_data(configuration, property)" ) self.db.execute( "CREATE INDEX int_data_idx_system_property ON int_data(system, property)" ) self.db.execute( "CREATE INDEX int_data_idx_configuration ON int_data(configuration)" ) self.db.execute("CREATE INDEX int_data_idx_system ON int_data(system)") # String facts table = self.system_db["str_data"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("configuration", coltype="int", references="configuration") table.add_attribute("system", coltype="int", references="system") table.add_attribute("property", coltype="int", references="property") table.add_attribute("value", coltype="str") self.db.execute( "CREATE INDEX str_data_idx_configuration_property_value" " ON str_data(configuration, property, value)" ) self.db.execute( "CREATE INDEX str_data_idx_system_property_value" " ON str_data(system, property, value)" ) self.db.execute( "CREATE INDEX str_data_idx_property_value" " ON str_data(property, value)" ) self.db.execute( "CREATE INDEX str_data_idx_configuration_property" " ON str_data(configuration, property)" ) self.db.execute( "CREATE INDEX str_data_idx_system_property ON str_data(system, property)" ) self.db.execute( "CREATE INDEX str_data_idx_configuration ON str_data(configuration)" ) self.db.execute("CREATE INDEX str_data_idx_system ON str_data(system)") # Array facts table = self.system_db["json_data"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("configuration", coltype="int", references="configuration") table.add_attribute("system", coltype="int", references="system") table.add_attribute("property", coltype="int", references="property") table.add_attribute("value", coltype="str") self.db.execute( "CREATE INDEX json_data_idx_configuration_property_value" " ON json_data(configuration, property, value)" ) self.db.execute( "CREATE INDEX json_data_idx_system_property_value" " ON json_data(system, property, value)" ) self.db.execute( "CREATE INDEX json_data_idx_property_value" " ON json_data(property, value)" ) self.db.execute( "CREATE INDEX json_data_idx_configuration_property" " ON json_data(configuration, property)" ) self.db.execute( "CREATE INDEX json_data_idx_system_property ON json_data(system, property)" ) self.db.execute( "CREATE INDEX json_data_idx_configuration ON json_data(configuration)" ) self.db.execute("CREATE INDEX json_data_idx_system ON json_data(system)") def description(self, _property): """The description of a property Parameters ---------- _property : int or str The id or name of the property. Returns ------- str The description of the property. """ return self.metadata(_property)[2] def exists(self, name): """Whether the named property exists. Parameters ---------- name : str The name of the property. Returns ------- bool True if the property is registered, False otherwise. """ self.cursor.execute("SELECT COUNT(*) FROM property WHERE name = ?", (name,)) return self.cursor.fetchone()[0] != 0 def get( self, _id, pattern="*", match="glob", is_system=False, include_system_properties=False, include_configuration_properties=False, types=["float", "int", "str", "json"], ): """Get the property value(s) Parameters ---------- _id : int The id of the configuration or system. is_system : bool=False Whether the ID is for a system, not a configuration. pattern : str = "*" The pattern of the property. match : str="glob" Whether to use exact, glob, or 'like' matching. include_system_properties : bool=False For a configuration, whether to include properties that are on the system. include_configuration_properties : bool=False For a system, whether to include properties that are on the configurations of the system. types : [str] = ["float", "int", "str", "json"] The type of results to return. Returns ------- {str: {str: value}} The matching property values. """ if match == "glob": op = "GLOB" elif match == "like": op = "LIKE" elif match == "exact": op = "=" else: raise ValueError(f"Unknown match type '{match}'.") sql = "" args = [] if is_system: # Handle properties of systems if include_configuration_properties: for _type in types: if len(args) > 1: sql += " UNION\n" sql += ( " SELECT name, type, value, system, configuration\n" f" FROM property, {_type}_data\n" " WHERE system = ?\n" f" AND {_type}_data.property = property.id\n" f" AND name {op} ?\n" ) args.append(_id) args.append(pattern) else: for _type in types: if len(args) > 1: sql += " UNION\n" sql += ( " SELECT name, type, value, system, configuration\n" f" FROM property, {_type}_data\n" " WHERE system = ?\n" " AND configuration is Null\n" f" AND {_type}_data.property = property.id\n" f" AND name {op} ?\n" ) args.append(_id) args.append(pattern) else: for _type in types: if len(args) > 1: sql += " UNION\n" sql += ( " SELECT name, type, value, system, configuration\n" f" FROM property, {_type}_data\n" " WHERE configuration = ?\n" f" AND {_type}_data.property = property.id\n" f" AND name {op} ?\n" ) args.append(_id) args.append(pattern) if include_system_properties: self.cursor.execute( "SELECT system FROM configuration WHERE id = ?", (_id,) ) system_id = self.cursor.fetchone()[0] for _type in types: if len(args) > 1: sql += " UNION\n" sql += ( " SELECT name, type, value, system, configuration\n" f" FROM property, {_type}_data\n" " WHERE system = ?\n" " AND configuration is Null\n" f" AND {_type}_data.property = property.id\n" f" AND name {op} ?\n" ) args.append(system_id) args.append(pattern) self.cursor.execute(sql, args) result = {} for row in self.cursor: name, _type, value, sid, cid = row result[name] = { "sid": sid, "cid": cid, } if _type == "float": try: result[name]["value"] = float(value) except Exception as e: logger.warning(f"Error with value of property '{name}': {str(e)}") del result[name] elif _type == "int": try: result[name]["value"] = int(value) except Exception as e: logger.warning(f"Error with value of property '{name}': {str(e)}") del result[name] elif _type == "json": try: result[name]["value"] = json.loads(value) except Exception as e: logger.warning(f"Error with value of property '{name}': {str(e)}") del result[name] else: result[name]["value"] = value return result def id(self, name): """The id for a property Parameters ---------- name : str The name of the property. Returns ------- int The database id for the property. """ self.cursor.execute("SELECT id FROM property WHERE name = ?", (name,)) tmp = self.cursor.fetchone() if tmp is None: raise KeyError(f"Property '{name}' is not known.") else: return tmp[0] def known_properties(self): """List the known properties.""" self.cursor.execute("SELECT name FROM property") return [row[0] for row in self.cursor.fetchall()] def list( self, _id, pattern="*", match="glob", is_system=False, include_system_properties=False, include_configuration_properties=False, as_ids=False, types=["float", "int", "str", "json"], ): """Get the list of matching properties for the configuration. Parameters ---------- _id : int The id of the configuration. is_system : bool=False Whether the ID is for a system, not a configuration. pattern : str = "*" The pattern of the property. match : str="glob" Whether to use exact, glob, or 'like' matching. include_system_properties : bool=False For a configuration, whether to include properties that are on the system. include_configuration_properties : bool=False For a system, whether to include properties that are on the configurations of the system. as_ids : bool=False Whether to return the ids rather than names types : [str] = ["float", "int", "str", "json"] The type of results to return. Returns ------- [str] or [int] The matching properties. """ if match == "glob": op = "GLOB" elif match == "like": op = "LIKE" elif match == "exact": op = "=" else: raise ValueError(f"Unknown match type '{match}'.") if as_ids: sql = "SELECT id FROM property\n" else: sql = "SELECT name FROM property\n" sql += f" WHERE name {op} ?\n" " AND id IN (\n" args = [pattern] if is_system: # Handle properties of systems if include_configuration_properties: for _type in types: if len(args) > 1: sql += " UNION\n" f" SELECT property FROM {_type}_data WHERE system = ?\n" args.append(_id) else: for _type in types: if len(args) > 1: sql += " UNION\n" sql += ( " SELECT property\n" f" FROM {_type}_data\n" " WHERE system = ? AND configuration is Null\n" ) args.append(_id) else: for _type in types: if len(args) > 1: sql += " UNION\n" sql += f" SELECT property FROM {_type}_data WHERE configuration = ?\n" args.append(_id) if include_system_properties: self.cursor.execute( "SELECT system FROM configuration WHERE id = ?", (_id,) ) system_id = self.cursor.fetchone()[0] for _type in types: if len(args) > 1: sql += " UNION\n" sql += ( " SELECT property\n" f" FROM {_type}_data\n" " WHERE configuration is Null AND system = ?\n" ) args.append(system_id) sql += " )" self.cursor.execute(sql, args) return [row[0] for row in self.cursor] def metadata(self, _property): """The metadata for a property Parameters ---------- _property : int or str The id or name of the property. Returns ------- str, str, str The type, units, and description of the property """ if isinstance(_property, str): self.cursor.execute( "SELECT type, units, description FROM property WHERE name = ?", (_property,), ) else: self.cursor.execute( "SELECT type, units, description FROM property WHERE id = ?", (_property,), ) tmp = self.cursor.fetchone() if tmp is not None: return tmp if _property in self.standard_properties: data = self.standard_properties[_property] return data["Type"], data["Units"], data["Description"] raise KeyError(f"Property '{_property}' is not known.") def name(self, pid): """The name of a property Parameters ---------- pid : int The id of the property. Returns ------- str The name of the property. """ self.cursor.execute("SELECT name FROM property WHERE id = ?", (pid,)) tmp = self.cursor.fetchone() if tmp is None: raise KeyError(f"Property id = '{pid}' is not known.") else: return tmp[0] def property_id(self, name): "Obsolete routine kept for compatibility" return self.id(name) def property_name(self, pid): "Obsolete routine kept for compatibility" return self.name(pid) def property_type(self, _property): "Obsolete routine kept for compatibility" return self.type(_property) def put(self, _id, _property, value, is_system=False): """Store the given property value for the configuration or system Parameters ---------- _id : int The id of the configuration. _property : int or str The id or name of the property. value : int, float, or str The value to store. is_system : bool=False Whether the ID is for a system, not a configuration. """ # Get the property id and type if isinstance(_property, str): if not self.exists(_property): if _property in self.standard_properties: self.add(_property) else: raise ValueError(f"Property '{_property}' does not exist.") pid = self.id(_property) else: pid = _property ptype = self.type(pid) if ptype == "json": value = json.dumps(value, separators=(",", ":")) if is_system: sql = ( f"INSERT INTO {ptype}_data (system, property, value)" " VALUES(?, ?, ?)" ) self.db.execute(sql, (_id, pid, value)) else: # Get the system id self.cursor.execute("SELECT system FROM configuration WHERE id = ?", (_id,)) sid = self.cursor.fetchone()[0] sql = ( f"INSERT INTO {ptype}_data (configuration, system, property, value)" " VALUES(?, ?, ?, ?)" ) self.db.execute(sql, (_id, sid, pid, value)) def query(self, *args, what=["configuration"]): """Find configurations that match the query defined by the args. what : [str or int]] What to return ... "configuration", property names or ids. where : {} Note ---- args should be a multiple of 3 in length, with each triplet being (property, operator, value). The property """ sql = "SELECT" where = "WHERE" criteria = [] tables = {} results = [] types = [] n_tables = 0 for i, item in enumerate(what): if i > 0: sql += "," if isinstance(item, str): if item == "configuration": sql += " t0.configuration" results.append("configuration_id") types.append["configuration"] else: pid = self.id(item) ptype = self.type(pid) table = ptype + "_data" if table not in tables: tables[table] = {pid: f"t{n_tables}"} n_tables += 1 elif pid not in tables[table]: tables[table][pid] = f"t{n_tables}" n_tables += 1 alias = tables[table][pid] sql += f" {alias}.value" criteria.append(f"{alias}.property == {pid}") results.append(item) types.append(ptype) elif isinstance(item, int): pid = item ptype = self.type(pid) table = ptype + "_data" if table not in tables: tables[table] = {pid: f"t{n_tables}"} n_tables += 1 elif pid not in tables[table]: tables[table][pid] = f"t{n_tables}" n_tables += 1 alias = tables[table][pid] sql += f" {alias}.value" criteria.append(f"{alias}.property == {pid}") results.append(self.name(pid)) types.append(ptype) # And the WHERE clause ... where = {0: ""} items = iter(args) level = 0 for item in items: if item == "(": level += 1 where[level] = " (" elif item == ")": where[level] += ")" where[level - 1] += where[level] del where[level] level -= 1 else: # Configuration or Property name or id if isinstance(item, str): pid = self.id(item) else: pid = item ptype = self.type(pid) table = ptype + "_data" if table not in tables: tables[table] = {pid: f"t{n_tables}"} n_tables += 1 elif pid not in tables[table]: tables[table][pid] = f"t{n_tables}" n_tables += 1 alias = tables[table][pid] if where[level] != "" and where[level] != "( ": where[level] += " AND" where[level] += f" {alias}.property == {pid}" operator = next(items) if operator.lower() == "between": if ptype == "str": where[level] += ( f' AND {alias}.value BETWEEN "{next(items)}"' f' AND "{next(items)}"' ) if ptype == "json": value1 = json.dumps(next(items), separator=(",", ":")) value2 = json.dumps(next(items), separator=(",", ":")) where[ level ] += f' AND {alias}.value BETWEEN "{value1}" AND "{value2}"' else: where[level] += ( f" AND {alias}.value BETWEEN {next(items)}" f" AND {next(items)}" ) else: if ptype == "str": where[level] += f' AND {alias}.value {operator} "{next(items)}"' elif ptype == "json": value = json.dumps(next(items), separator=(",", ":")) where[level] += f' AND {alias}.value {operator} "{value}"' else: where[level] += f" AND {alias}.value {operator} {next(items)}" # Put it all together ... FROM clause plus needed criteria # if len(tables) == 0: # # No tables ... so probably getting all configurations with properties # tables = ["int_data", "float_data", "str_data"] sql += " FROM " tmp = [] for table, data in tables.items(): for pid, alias in data.items(): tmp.append(f"{table} AS {alias}") sql += ", ".join(tmp) # Now for the WHERE clause sql += " WHERE " tmp = [] for table, data in tables.items(): for pid, alias in data.items(): if alias != "t0": tmp.append(f"{alias}.configuration = t0.configuration") sql += " AND ".join(tmp) if len(criteria) > 0: if len(tmp) > 0: sql += " AND " sql += " AND ".join(criteria) # The user criteria, if any if where[0] != "": if len(tmp) > 0: sql += " AND" sql += where[0] logger.info(sql) result = {item: [] for item in results} try: self.cursor.execute(sql) except Exception: print("") print(f"{tmp=}") print(f"{criteria=}") print(f"{where[0]=}") raise for row in self.cursor: for item, ptype, value in zip(results, types, row): if ptype == "json": result[item].append(json.loads(value)) else: result[item].append(value) return result def type(self, _property): """The type of a property Parameters ---------- _property : int or str The id or name of the property. Returns ------- str The type of the property. """ return self.metadata(_property)[0] def units(self, _property): """The unit string of a property Parameters ---------- _property : int or str The id or name of the property. Returns ------- str The units string for the property. """ return self.metadata(_property)[1]