Source code for molsystem.properties

# -*- coding: utf-8 -*-

import csv
import json
import logging
from pathlib import Path
import pkg_resources

logger = logging.getLogger(__name__)
standard_properties = {}


[docs] def add_properties_from_file(path): """The standard properties recognized by SEAMM. These are officially defined properties that can be used anywhere in SEAMM, as long as the type and definition correspond to the standard. Each property is defined by a string with up to three parts: <property name>#<code or 'experiment'>#<technique or model chemistry> The property name is required. In most cases this is followed by either 'experiment' or the name of the code, e.g. 'MOPAC', 'Gaussian', or 'VASP'. The final part, if present, is either the experimental technique used to measure the property, or the model chemistry, such as 'MP2/6-31G**', 'PM7', or a forcefield name such as 'AMBER/ff19SB'. You can create other properties on the fly, but they follow the above convention and should have an appropriate code and, if necessary, model chemistry, so that they full name is unique and does not conflict with any other defined name. For example, the standard property "enthalpy of formation" refers to the experimental heat of formation, or a calculated value comparable to experimental values. If you are not sure what the heat of formation in e.g. MOPAC is, you could create a new property "enthalpy of formation#MOPAC#<parameterization>", which is clearly similar to the standard "enthalpy of formation". If the community decides that it is indeed the same, it can be replaced by the standard form, and also aliased to it for backwards compatibility. """ global standard_properties with open(path, newline="", encoding="utf-8-sig") as fd: data = csv.reader(fd) line = 0 for row in data: line += 1 if line == 1: # Headers headers = [*row] if headers != [ "Property", "Type", "Units", "Description", "URL", ]: raise ValueError( "Header of standard properties file not valid: " + ", ".join(headers) ) else: property = row[0] data = standard_properties[property] = {} for key, value in zip(headers[1:], row[1:]): data[key] = value
path = Path(pkg_resources.resource_filename(__name__, "data/")) csv_file = path / "standard_properties.csv" add_properties_from_file(csv_file) class _Properties(object): """A class for handling the properties in the star schema.""" def __init__(self, system_db): self._system_db = system_db self._standard_properties = None @property def cursor(self): return self.system_db.cursor @property def db(self): return self.system_db.db @property def standard_properties(self): global standard_properties return standard_properties @property def system_db(self): """Return the SystemDB object.""" return self._system_db def add(self, name, _type="float", units=None, description="", noerror=False): """Add a property to the database. By default, it is an error if it already exists! Parameters ---------- name : str The name of the property. _type : str = "float" The type of the property -- float, int, str. Default is "float". units : str = None The units of the property value, as a string that Pint can interpret. description : str A longer, text description of the property. noerror: bool = False Whether to quietly ignore an existing entry Returns ------- int The database id of the property. Note ---- If the property is registered in the list of standard properties, just give the name and the rest will be filled in from the standard property metadata. """ if self.exists(name): if noerror: return self.id(name) else: raise ValueError(f"Property '{name}' already exists.") table = self.system_db["property"] if name in self.standard_properties: data = self.standard_properties[name] result = table.append( name=name, type=data["Type"], units=data["Units"], description=data["Description"], ) else: result = table.append( name=name, type=_type, units=units, description=description, ) return result[0] def create_schema(self): """Add the needed tables to the database.""" # The property dimension table table = self.system_db["property"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("name", coltype="str", index="unique") table.add_attribute("type", coltype="str") table.add_attribute("units", coltype="str") table.add_attribute("description", coltype="str") # Floating point facts table = self.system_db["float_data"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("configuration", coltype="int", references="configuration") table.add_attribute("system", coltype="int", references="system") table.add_attribute("property", coltype="int", references="property") table.add_attribute("value", coltype="float") self.db.execute( "CREATE INDEX float_data_idx_configuration_property_value" " ON float_data(configuration, property, value)" ) self.db.execute( "CREATE INDEX float_data_idx_system_property_value" " ON float_data(system, property, value)" ) self.db.execute( "CREATE INDEX float_data_idx_property_value" " ON float_data(property, value)" ) self.db.execute( "CREATE INDEX float_data_idx_configuration_property" " ON float_data(configuration, property)" ) self.db.execute( "CREATE INDEX float_data_idx_system_property" " ON float_data(system, property)" ) self.db.execute( "CREATE INDEX float_data_idx_configuration ON float_data(configuration)" ) self.db.execute("CREATE INDEX float_data_idx_system ON float_data(system)") # Integer facts table = self.system_db["int_data"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("configuration", coltype="int", references="configuration") table.add_attribute("system", coltype="int", references="system") table.add_attribute("property", coltype="int", references="property") table.add_attribute("value", coltype="int") self.db.execute( "CREATE INDEX int_data_idx_configuration_property_value" " ON int_data(configuration, property, value)" ) self.db.execute( "CREATE INDEX int_data_idx_system_property_value" " ON int_data(system, property, value)" ) self.db.execute( "CREATE INDEX int_data_idx_property_value" " ON int_data(property, value)" ) self.db.execute( "CREATE INDEX int_data_idx_configuration_property" " ON int_data(configuration, property)" ) self.db.execute( "CREATE INDEX int_data_idx_system_property ON int_data(system, property)" ) self.db.execute( "CREATE INDEX int_data_idx_configuration ON int_data(configuration)" ) self.db.execute("CREATE INDEX int_data_idx_system ON int_data(system)") # String facts table = self.system_db["str_data"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("configuration", coltype="int", references="configuration") table.add_attribute("system", coltype="int", references="system") table.add_attribute("property", coltype="int", references="property") table.add_attribute("value", coltype="str") self.db.execute( "CREATE INDEX str_data_idx_configuration_property_value" " ON str_data(configuration, property, value)" ) self.db.execute( "CREATE INDEX str_data_idx_system_property_value" " ON str_data(system, property, value)" ) self.db.execute( "CREATE INDEX str_data_idx_property_value" " ON str_data(property, value)" ) self.db.execute( "CREATE INDEX str_data_idx_configuration_property" " ON str_data(configuration, property)" ) self.db.execute( "CREATE INDEX str_data_idx_system_property ON str_data(system, property)" ) self.db.execute( "CREATE INDEX str_data_idx_configuration ON str_data(configuration)" ) self.db.execute("CREATE INDEX str_data_idx_system ON str_data(system)") # Array facts table = self.system_db["json_data"] table.add_attribute("id", coltype="int", pk=True) table.add_attribute("configuration", coltype="int", references="configuration") table.add_attribute("system", coltype="int", references="system") table.add_attribute("property", coltype="int", references="property") table.add_attribute("value", coltype="str") self.db.execute( "CREATE INDEX json_data_idx_configuration_property_value" " ON json_data(configuration, property, value)" ) self.db.execute( "CREATE INDEX json_data_idx_system_property_value" " ON json_data(system, property, value)" ) self.db.execute( "CREATE INDEX json_data_idx_property_value" " ON json_data(property, value)" ) self.db.execute( "CREATE INDEX json_data_idx_configuration_property" " ON json_data(configuration, property)" ) self.db.execute( "CREATE INDEX json_data_idx_system_property ON json_data(system, property)" ) self.db.execute( "CREATE INDEX json_data_idx_configuration ON json_data(configuration)" ) self.db.execute("CREATE INDEX json_data_idx_system ON json_data(system)") def description(self, _property): """The description of a property Parameters ---------- _property : int or str The id or name of the property. Returns ------- str The description of the property. """ return self.metadata(_property)[2] def exists(self, name): """Whether the named property exists. Parameters ---------- name : str The name of the property. Returns ------- bool True if the property is registered, False otherwise. """ self.cursor.execute("SELECT COUNT(*) FROM property WHERE name = ?", (name,)) return self.cursor.fetchone()[0] != 0 def get(self, configuration_id, _property="all", include_system_properties=False): """Get the given property value for the configuration. Parameters ---------- configuration_id : int The id of the configuration. _property : int or str, or "all" The id or name of the property, or all properties if "all". include_system_properties : bool=False Whether to include properties that are on the system, not any configuration Returns ------- int, float, or str The value of the property. """ if include_system_properties: self.cursor.execute( "SELECT system FROM configuration WHERE id = ?", (configuration_id,) ) system_id = self.cursor.fetchone()[0] if _property == "all": sql = ( "SELECT name, type, value\n" " FROM property, float_data\n" " WHERE float_data.property = property.id AND configuration = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, int_data\n" " WHERE int_data.property = property.id AND configuration = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, str_data\n" " WHERE str_data.property = property.id AND configuration = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, json_data\n" " WHERE json_data.property = property.id AND configuration = ?" ) if include_system_properties: sql += ( "\n" " UNION \n" "SELECT name, type, value\n" " FROM property, float_data\n" " WHERE float_data.property = property.id" " AND configuration IS NULL AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, int_data\n" " WHERE int_data.property = property.id" " AND configuration IS NULL AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, str_data\n" " WHERE str_data.property = property.id" " AND configuration IS NULL AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, json_data\n" " WHERE json_data.property = property.id" " AND configuration IS NULL AND system = ? \n" ) self.cursor.execute( sql, ( configuration_id, configuration_id, configuration_id, configuration_id, system_id, system_id, system_id, system_id, ), ) else: self.cursor.execute( sql, ( configuration_id, configuration_id, configuration_id, configuration_id, ), ) result = {} for row in self.cursor: name, _type, value = row if _type == "float": result[name] = float(value) elif _type == "int": result[name] = int(value) elif _type == "json": result[name] = json.loads(value) else: result[name] = value return result else: if isinstance(_property, str): if self.exists(_property): pid = self.id(_property) else: raise ValueError(f"Property '{_property}' does not exist.") else: pid = _property ptype = self.type(pid) sql = ( f"SELECT value FROM {ptype}_data\n" " WHERE configuration = ? AND property = ?" ) if include_system_properties: sql += ( "\n" " UNION \n" f"SELECT value FROM {ptype}_data\n" " WHERE configuration IS NULL and system = ? AND property = ?" ) self.cursor.execute(sql, (configuration_id, pid, system_id, pid)) else: self.cursor.execute(sql, (configuration_id, pid)) result = self.cursor.fetchone() if result is None: raise ValueError( f"Property {_property} does not exist for configuration " f"{configuration_id}" ) if ptype == "json": return json.loads(result[0]) else: return result[0] def get_for_system( self, system_id, _property="all", include_configuration_properties=False ): """Get the given property value(s) for the system. Parameters ---------- system_id : int The id of the system. _property : int or str of "all" The id or name of the property, or "all" for all properties. include_configuration_properties : bool=False Whether to include properties from any configuration in the system. Returns ------- [int, float, or str] The value(s) of the property. """ if _property == "all": if include_configuration_properties: sql = ( "SELECT name, type, value\n" " FROM property, float_data\n" " WHERE float_data.property = property.id AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, int_data\n" " WHERE int_data.property = property.id AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, str_data\n" " WHERE str_data.property = property.id AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, json_data\n" " WHERE json_data.property = property.id AND system = ?\n" ) else: sql = ( "SELECT name, type, value\n" " FROM property, float_data\n" " WHERE float_data.property = property.id" " AND configuration is NULL AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, int_data\n" " WHERE int_data.property = property.id" " AND configuration is NULL AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, str_data\n" " WHERE str_data.property = property.id" " AND configuration is NULL AND system = ?\n" " UNION \n" "SELECT name, type, value\n" " FROM property, json_data\n" " WHERE json_data.property = property.id" " AND configuration is NULL AND system = ?\n" ) self.cursor.execute(sql, (system_id, system_id, system_id, system_id)) result = {} for row in self.cursor: name, _type, value = row if _type == "float": result[name] = float(value) elif _type == "int": result[name] = int(value) elif _type == "json": result[name] = json.loads(value) else: result[name] = value else: if isinstance(_property, str): if self.exists(_property): pid = self.id(_property) else: raise ValueError(f"Property '{_property}' does not exist.") else: pid = _property ptype = self.type(pid) if include_configuration_properties: sql = ( f"SELECT value FROM {ptype}_data WHERE system = ? AND property = ?" ) else: sql = ( f"SELECT value FROM {ptype}_data" " WHERE configuration ISNULL AND system = ? AND property = ?" ) result = [] for row in self.db.execute(sql, (system_id, pid)): if ptype == "json": result.append(json.loads(row[0])) else: result.append(row[0]) return result def id(self, name): """The id for a property Parameters ---------- name : str The name of the property. Returns ------- int The database id for the property. """ self.cursor.execute("SELECT id FROM property WHERE name = ?", (name,)) tmp = self.cursor.fetchone() if tmp is None: raise KeyError(f"Property '{name}' is not known.") else: return tmp[0] def known_properties(self): """List the known properties.""" self.cursor.execute("SELECT name FROM property") return [row[0] for row in self.cursor.fetchall()] def metadata(self, _property): """The metadata for a property Parameters ---------- _property : int or str The id or name of the property. Returns ------- str, str, str The type, units, and description of the property """ if isinstance(_property, str): self.cursor.execute( "SELECT type, units, description FROM property WHERE name = ?", (_property,), ) else: self.cursor.execute( "SELECT type, units, description FROM property WHERE id = ?", (_property,), ) tmp = self.cursor.fetchone() if tmp is not None: return tmp if _property in self.standard_properties: data = self.standard_properties[_property] return data["Type"], data["Units"], data["Description"] raise KeyError(f"Property '{_property}' is not known.") def name(self, pid): """The name of a property Parameters ---------- pid : int The id of the property. Returns ------- str The name of the property. """ self.cursor.execute("SELECT name FROM property WHERE id = ?", (pid,)) tmp = self.cursor.fetchone() if tmp is None: raise KeyError(f"Property id = '{pid}' is not known.") else: return tmp[0] def property_id(self, name): "Obsolete routine kept for compatibility" return self.id(name) def property_name(self, pid): "Obsolete routine kept for compatibility" return self.name(pid) def property_type(self, _property): "Obsolete routine kept for compatibility" return self.type(_property) def put(self, configuration_id, _property, value): """Store the given property value for the configuration. Parameters ---------- configuration_id : int The id of the configuration. _property : int or str The id or name of the property. value : int, float, or str The value to store. """ # Get the property id and type if isinstance(_property, str): if not self.exists(_property): if _property in self.standard_properties: self.add(_property) else: raise ValueError(f"Property '{_property}' does not exist.") pid = self.id(_property) else: pid = _property ptype = self.type(pid) if ptype == "json": value = json.dumps(value, separators=(",", ":")) # Get the system id self.cursor.execute( "SELECT system FROM configuration WHERE id = ?", (configuration_id,) ) system_id = self.cursor.fetchone()[0] sql = ( f"INSERT INTO {ptype}_data (configuration, system, property, value)" " VALUES(?, ?, ?, ?)" ) self.db.execute(sql, (configuration_id, system_id, pid, value)) def put_for_system(self, system_id, _property, value): """Store the given property value for the system. Parameters ---------- system_id : int The id of the system. _property : int or str The id or name of the property. value : int, float, or str The value to store. """ # Get the property id and type if isinstance(_property, str): if not self.exists(_property): if _property in self.standard_properties: self.add(_property) else: raise ValueError(f"Property '{_property}' does not exist.") pid = self.id(_property) else: pid = _property ptype = self.type(pid) if ptype == "json": value = json.dumps(value, separators=(",", ":")) sql = ( f"INSERT INTO {ptype}_data (system, property, value)" " VALUES(?, ?, ?)" ) self.db.execute(sql, (system_id, pid, value)) def query(self, *args, what=["configuration"]): """Find configurations that match the query defined by the args. what : [str or int]] What to return ... "configuration", property names or ids. where : {} Note ---- args should be a multiple of 3 in length, with each triplet being (property, operator, value). The property """ sql = "SELECT" where = "WHERE" criteria = [] tables = {} results = [] types = [] n_tables = 0 for i, item in enumerate(what): if i > 0: sql += "," if isinstance(item, str): if item == "configuration": sql += " t0.configuration" results.append("configuration_id") types.append["configuration"] else: pid = self.id(item) ptype = self.type(pid) table = ptype + "_data" if table not in tables: tables[table] = {pid: f"t{n_tables}"} n_tables += 1 elif pid not in tables[table]: tables[table][pid] = f"t{n_tables}" n_tables += 1 alias = tables[table][pid] sql += f" {alias}.value" criteria.append(f"{alias}.property == {pid}") results.append(item) types.append(ptype) elif isinstance(item, int): pid = item ptype = self.type(pid) table = ptype + "_data" if table not in tables: tables[table] = {pid: f"t{n_tables}"} n_tables += 1 elif pid not in tables[table]: tables[table][pid] = f"t{n_tables}" n_tables += 1 alias = tables[table][pid] sql += f" {alias}.value" criteria.append(f"{alias}.property == {pid}") results.append(self.name(pid)) types.append(ptype) # And the WHERE clause ... where = {0: ""} items = iter(args) level = 0 for item in items: if item == "(": level += 1 where[level] = " (" elif item == ")": where[level] += ")" where[level - 1] += where[level] del where[level] level -= 1 else: # Configuration or Property name or id if isinstance(item, str): pid = self.id(item) else: pid = item ptype = self.type(pid) table = ptype + "_data" if table not in tables: tables[table] = {pid: f"t{n_tables}"} n_tables += 1 elif pid not in tables[table]: tables[table][pid] = f"t{n_tables}" n_tables += 1 alias = tables[table][pid] if where[level] != "" and where[level] != "( ": where[level] += " AND" where[level] += f" {alias}.property == {pid}" operator = next(items) if operator.lower() == "between": if ptype == "str": where[level] += ( f' AND {alias}.value BETWEEN "{next(items)}"' f' AND "{next(items)}"' ) if ptype == "json": value1 = json.dumps(next(items), separator=(",", ":")) value2 = json.dumps(next(items), separator=(",", ":")) where[ level ] += f' AND {alias}.value BETWEEN "{value1}" AND "{value2}"' else: where[level] += ( f" AND {alias}.value BETWEEN {next(items)}" f" AND {next(items)}" ) else: if ptype == "str": where[level] += f' AND {alias}.value {operator} "{next(items)}"' elif ptype == "json": value = json.dumps(next(items), separator=(",", ":")) where[level] += f' AND {alias}.value {operator} "{value}"' else: where[level] += f" AND {alias}.value {operator} {next(items)}" # Put it all together ... FROM clause plus needed criteria # if len(tables) == 0: # # No tables ... so probably getting all configurations with properties # tables = ["int_data", "float_data", "str_data"] sql += " FROM " tmp = [] for table, data in tables.items(): for pid, alias in data.items(): tmp.append(f"{table} AS {alias}") sql += ", ".join(tmp) # Now for the WHERE clause sql += " WHERE " tmp = [] for table, data in tables.items(): for pid, alias in data.items(): if alias != "t0": tmp.append(f"{alias}.configuration = t0.configuration") sql += " AND ".join(tmp) if len(criteria) > 0: if len(tmp) > 0: sql += " AND " sql += " AND ".join(criteria) # The user criteria, if any if where[0] != "": if len(tmp) > 0: sql += " AND" sql += where[0] logger.info(sql) result = {item: [] for item in results} try: self.cursor.execute(sql) except Exception: print("") print(f"{tmp=}") print(f"{criteria=}") print(f"{where[0]=}") raise for row in self.cursor: for item, ptype, value in zip(results, types, row): if ptype == "json": result[item].append(json.loads(value)) else: result[item].append(value) return result def type(self, _property): """The type of a property Parameters ---------- _property : int or str The id or name of the property. Returns ------- str The type of the property. """ return self.metadata(_property)[0] def units(self, _property): """The unit string of a property Parameters ---------- _property : int or str The id or name of the property. Returns ------- str The units string for the property. """ return self.metadata(_property)[1]