Source code for molsystem.bonds

# -*- coding: utf-8 -*-

"""A dictionary-like object for holding bonds

Based on tables in an SQLite database.
"""

from collections.abc import Sequence
from itertools import zip_longest
import logging
import sqlite3

import numpy as np
import pandas

from .column import _Column
from .table import _Table
from .frozencolumn import _FrozenColumn

logger = logging.getLogger(__name__)


[docs] def grouped(iterable, n): "s -> (s0,s1,s2,...sn-1), (sn,sn+1,sn+2,...s2n-1), (s2n,...s3n-1), ..." return zip_longest(*[iter(iterable)] * n)
class _Bonds(_Table): """The Bonds class describes the bonds in the system.""" def __init__(self, configuration): self._configuration = configuration self._system = None super().__init__(configuration._system_db, "bond") def __enter__(self): """Copy the tables to a backup for a 'with' statement.""" self.system_db["bondset_bond"].__enter__() self.system_db["bond"].__enter__() return self def __exit__(self, etype, value, traceback): """Handle returning from a 'with' statement.""" if etype is None: self.configuration.version = self.configuration.version + 1 self.system_db["bondset_bond"].__exit__(etype, value, traceback) return self.system_db["bond"].__exit__(etype, value, traceback) def __eq__(self, other): """Return a boolean if this object is equal to another""" raise NotImplementedError() @property def asymmetric_bondorders(self): """The bond orders of the asymmetric bonds.""" return self.get_column_data("bondorder") @property def ids(self): """The ids of the bonds.""" return self.get_ids() @property def bondorders(self): """The bond orders.""" if self.configuration.symmetry.n_symops == 1: return self.asymmetric_bondorders else: bondorders = self.asymmetric_bondorders asym_bonds = self.configuration.symmetry.bond_to_asymmetric_bond return [bondorders[i] for i in asym_bonds] @property def bondset(self): """The bondset for these bonds.""" return self._configuration.bondset @property def bonds_for_asymmetric_bonds(self): """List of bonds for each asymmetric bond.""" if self.configuration.symmetry.n_symops == 1: result = [[i] for i in range(self.n_asymmetric_bonds)] else: result = [[] for i in range(self.n_asymmetric_bonds)] to_asym = self.configuration.symmetry.bond_to_asymmetric_bond for i, asym_bond in enumerate(to_asym): result[asym_bond].append(i) return result @property def configuration(self): """Return the configuration.""" return self._configuration @property def cursor(self): """The database connection.""" return self.system_db.cursor @property def db(self): """The database connection.""" return self.system_db.db @property def loglevel(self): """The logging level for this module.""" result = logger.getEffectiveLevel() tmp = logging.getLevelName(result) if "Level" not in tmp: result = tmp return result @loglevel.setter def loglevel(self, value): logger.setLevel(value) @property def n_asymmetric_bonds(self): """The number of asymmetric bonds.""" self.cursor.execute( "SELECT COUNT(*) FROM bondset_bond WHERE bondset = ?", (self.bondset,) ) return self.cursor.fetchone()[0] @property def n_bonds(self): """The number of bonds.""" if self.configuration.symmetry.n_symops == 1: return self.n_asymmetric_bonds else: return len(self.configuration.symmetry.bond_atoms) @property def system_db(self): """The system database that we belong to.""" return self._system_db @property def system(self): """The system that we belong to.""" if self._system is None: self.cursor.execute( "SELECT system FROM configuration WHERE id = ?", (self.id,) ) self._system = self.system_db.get_system(self.cursor.fetchone()[0]) return self._system def append(self, **kwargs): """Append one or more bonds The keys give the field for the data. If an existing field is not mentioned, then the default value is used, unless the default is None, in which case an error is thrown. It is an error if there is not a field corrresponding to a key. Parameters ---------- bonds : [sqlite3.Rows] or {str : [int]} The bonds as either SQLite Rows or a dictionary of values or lists. Any other arguments override the same value in 'bonds'. i : [int] The atom indices of the first atom. j : [int] The atom indices of the second atom. bondorder : [int] optional Bond orders, defaults to 1 for single bonds. other : [int, float, str] optional Other optional attributes of bonds. """ if "bonds" in kwargs: bonds = kwargs.pop("bonds") if isinstance(bonds, sqlite3.Row): for item in bonds.keys(): if item != "id": kwargs[item] = bonds[item] elif isinstance(bonds, Sequence) and isinstance(bonds[0], sqlite3.Row): for item in bonds[0].keys(): if item != "id": kwargs[item] = [row[item] for row in bonds] else: try: for key, value in bonds.items(): if isinstance(value, Sequence): kwargs[key] = value else: kwargs[key] = [value] except AttributeError: raise TypeError( "bonds argument must be sqlite3.Row or a mapping " f"(dict) type, not {type(bonds)}." ) # Check keys and lengths of added bonds if "i" not in kwargs or "j" not in kwargs: raise KeyError("The atoms i & j are required!") i = kwargs.pop("i") j = kwargs.pop("j") try: len_i = len(i) except TypeError: len_i = 1 i = (i,) try: len_j = len(j) except TypeError: len_j = 1 j = (j,) if len_i == 0 and len_j == 0: return if len_i == 1: if len_j > 1: i = len_j * [i[0]] elif len_j == 1: j = len_i * [j[0]] elif len_i != len_j: raise IndexError( f'key "j" has the wrong number of values, {len_j}. ' f"Should be 1 or the number of values in i, {len_i}." ) # Ensure that i < j if isinstance(i, np.ndarray): i = i.tolist() if isinstance(j, np.ndarray): j = j.tolist() i2 = [] j2 = [] symop1 = [] symop2 = [] op1s = kwargs["symop1"] if "symop1" in kwargs else [0] * len(i) op2s = kwargs["symop2"] if "symop2" in kwargs else [0] * len(i) if isinstance(op1s, int): op1s = [op1s] if isinstance(op2s, int): op2s = [op2s] for i_, j_, op1, op2 in zip(i, j, op1s, op2s): if not isinstance(i_, int) or not isinstance(j_, int): raise TypeError("'i' and 'j', the atom indices, must be integers") if i_ < j_: i2.append(i_) j2.append(j_) symop1.append(op1) symop2.append(op2) else: i2.append(j_) j2.append(i_) symop1.append(op2) symop2.append(op1) ids = super().append(i=i2, j=j2, **kwargs) # And to the bondset table = _Table(self.system_db, "bondset_bond") table.append(bondset=self.bondset, bond=ids) self.configuration.symmetry.reset_bonds() return ids def bonds(self, *args, asymmetric=False): """Returns an iterator over the bonds. Parameters ---------- args : [str] Added selection criteria for the SQL, one word at a time. asymmetric : bool Whether to produce only the asymmetric bonds Returns ------- sqlite3.Cursor A cursor that returns sqlite3.Row objects for the bonds. """ sql = ( "SELECT bond.* FROM bond, bondset_bond" " WHERE bond.id = bondset_bond.bond" " AND bondset_bond.bondset = ?" ) return self.db.execute(sql, (self.bondset,)) def contains_bond(self, i, j): """Whether there is a bond between atoms i and j.""" # get canonical order if i > j: j, i = i, j sql = ( "SELECT COUNT(*) FROM bond, bondset_bond" " WHERE bond.i = ? AND bond.j = ?" " AND bondset_bond.bondset = ?" ) self.cursor.execute(sql, (i, j, self.bondset)) return self.cursor.fetchone()[0] == 1 def delete_bond(self, i, j=None, force=False): """Delete a bond, if present. Parameters ---------- i : int Either the first atom in the bond or the bond id. j : int = None If not None, then the second atom in the bond. force : bool = False If true, ignore missing bonds. """ # get canonical order if j is None: bond_id = i else: if i > j: j, i = i, j sql = ( "SELECT bond.id FROM bond, bondset_bond" " WHERE bond.i = ? AND bond.j = ?" " AND bondset_bond.bondset = ?" ) self.cursor.execute(sql, (i, j, self.bondset)) bond_id = self.cursor.fetchone() if bond_id is not None: bond_id = bond_id[0] if bond_id is None: if not force: raise ValueError("The bond did not exist.") else: self.cursor.execute("DELETE FROM bond WHERE id = ?", (bond_id,)) self.configuration.symmetry.reset_bonds() def delete(self, atoms=None): """Deletes all the bonds, optionally from atoms. Parameters ---------- atoms : [int] = None The list of atoms whose bonds are deleted Returns ------- None """ if atoms is not None: sql = ( "DELETE FROM bond" " WHERE (i = ? OR j = ?)" " AND id in (" " SELECT bond FROM bondset_bond" " WHERE bondset = ?" " )" ) parameters = [(i, i, self.bondset) for i in atoms] self.db.executemany(sql, parameters) else: sql = ( "DELETE FROM bond" " WHERE id in (" " SELECT bond FROM bondset_bond" " WHERE bondset = ?" " )" ) self.db.execute(sql, (self.bondset,)) def diff(self, other): """Difference between these bonds and another Parameters ---------- other : _Bonds The other bonds to diff against Result ------ result : Dict The differences, described in a dictionary """ result = {} # Check the columns columns = self._columns() other_columns = other._columns() column_defs = ", ".join(columns) other_column_defs = ", ".join(other_columns) if columns == other_columns: column_def = column_defs else: added = columns - other_columns if len(added) > 0: result["columns added"] = list(added) deleted = other_columns - columns if len(deleted) > 0: result["columns deleted"] = list(deleted) in_common = other_columns & columns if len(in_common) > 0: column_def = ", ".join(in_common) else: # No columns shared return result # Need to check the contents of the tables. See if they are in the same # database or if we need to attach the other database temporarily. db = self.system_db other_db = other.system_db detach = False schema = self.schema if db.filename != other_db.filename: if db.is_attached(other_db): other_schema = db.attached_as(other_db) else: # Attach the other system_db in order to do comparisons. other_schema = self.system_db.attach(other_db) detach = True else: other_schema = other.schema bondset = self.bondset other_bondset = other.bondset changed = {} last = None sql = f""" SELECT * FROM ( SELECT {column_def} FROM {other_schema}.bond WHERE id IN ( SELECT bond FROM {other_schema}.bondset_bond WHERE bondset = {other_bondset} ) EXCEPT SELECT {column_def} FROM {schema}.bond WHERE id IN ( SELECT bond FROM {schema}.bondset_bond WHERE bondset = {bondset} ) ) UNION ALL SELECT * FROM ( SELECT {column_def} FROM {schema}.bond WHERE id IN ( SELECT bond FROM {schema}.bondset_bond WHERE bondset = {bondset} ) EXCEPT SELECT {column_def} FROM {other_schema}.bond WHERE id IN ( SELECT bond FROM {other_schema}.bondset_bond WHERE bondset = {other_bondset} ) ) ORDER BY id """ for row in self.db.execute(sql): if last is None: last = row elif row["id"] == last["id"]: # changes = [] changes = set() for k1, v1, v2 in zip(last.keys(), last, row): if v1 != v2: changes.add((k1, v1, v2)) changed[row["id"]] = changes last = None else: last = row if len(changed) > 0: result["changed"] = changed # See about the rows added added = {} sql = f""" SELECT {column_defs} FROM {schema}.bond WHERE id IN ( SELECT bond FROM {schema}.bondset_bond WHERE bondset = {bondset} ) AND id NOT IN ( SELECT bond FROM {other_schema}.bondset_bond WHERE bondset = {other_bondset} ) """ for row in self.db.execute(sql): added[row["id"]] = row[1:] if len(added) > 0: result["columns in added rows"] = row.keys()[1:] result["added"] = added # See about the rows deleted deleted = {} sql = f""" SELECT {other_column_defs} FROM {other_schema}.bond WHERE id IN ( SELECT bond FROM {other_schema}.bondset_bond WHERE bondset = {other_bondset} ) AND id NOT IN ( SELECT bond FROM {schema}.bondset_bond WHERE bondset = {bondset} ) """ for row in self.db.execute(sql): deleted[row["id"]] = row[1:] if len(deleted) > 0: result["columns in deleted rows"] = row.keys()[1:] result["deleted"] = deleted # Detach the other database if needed if detach: self.system_db.detach(other_db) return result def get_as_dict(self, *args): """Return the bond data as a Python dictionary of lists. Parameters ---------- args : [str] Added selection criteria for the SQL, one word at a time. Returns ------- dict(str: []) A dictionary whose keys are the column names and values as lists """ rows = self.bonds(*args) columns = [x[0] for x in rows.description] data = {key: [] for key in columns} for row in rows: for key, value in zip(columns, row): data[key].append(value) return data def get_bond(self, i, j=None, force=False): """Return the row for a bond. Parameters ---------- i : int Either the first atom in the bond or the bond id. j : int = None If not None, then the second atom in the bond. force : bool = False If true, ignore missing bonds. Returns ------- row : SQLite3.Row The Row object for the bond """ if j is None: sql = ( "SELECT bond.* FROM bond, bondset_bond" " WHERE id = ?" " AND bondset_bond.bond = bond.id" " AND bondset_bond.bondset = ?" ) self.cursor.execute(sql, (i, self.bondset)) row = self.cursor.fetchone() if row is None and not force: raise KeyError(f"No bond id = {i} found") else: if i > j: j, i = i, j sql = ( "SELECT bond.* FROM bond, bondset_bond" " WHERE bond.i = ? AND bond.j = ?" " AND bondset_bond.bondset = ?" ) self.cursor.execute(sql, (i, j, self.bondset)) row = self.cursor.fetchone() if row is None and not force: raise KeyError(f"No bond from {i} to {j} found") return row def get_column(self, key): """Return a column of data. Parameters ---------- key : str The column (attribute) to getLogger Returns ------- column : Column The column object requested. """ if key == "i" or key == "j" or key == "id": sql = f""" SELECT {key} FROM bond, bondset_bond WHERE bond.id = bondset_bond.bond AND bondset_bond.bondset = {self.bondset} """ return _FrozenColumn(self, key, sql) else: sql = f""" SELECT id, {key} FROM bond, bondset_bond WHERE bond.id = bondset_bond.bond AND bondset_bond.bondset = {self.bondset} """ return _Column(self, key, sql) def get_column_data(self, key): """Return a column of data as a Python list. Parameters ---------- key : str The column (attribute) to getLogger Returns ------- column : Column The column object requested. """ sql = f""" SELECT {key} FROM bond, bondset_bond WHERE bond.id = bondset_bond.bond AND bondset_bond.bondset = {self.bondset} """ return [row[0] for row in self.db.execute(sql)] def get_ids(self, *args): """The ids of the bonds. Parameters ---------- args : [str] Added selection criteria for the SQL, one word at a time. Returns ------- [int] The ids of the requested bonds. """ sql = ( "SELECT bond.id FROM bond, bondset_bond" " WHERE bond.id = bondset_bond.bond" " AND bondset_bond.bondset = ?" ) parameters = [self.bondset] if len(args) > 0: for col, op, value in grouped(args, 3): if op == "==": op = "=" sql += f' AND "{col}" {op} ?' parameters.append(value) return [x[0] for x in self.db.execute(sql, parameters)] def import_bonds(self, ids): """Import existing bonds into this configuration. Parameters ---------- ids : iterable(int) The ids of the bonds to import. """ table = _Table(self.system_db, "bondset_bond") table.append(bondset=self.bondset, bond=ids) def get_lengths(self, asymmetric=False, as_array=False): """Return the lengths of the bonds. Parameters ---------- asymmetric : bool = False Return the lengths of just the asymmetric bonds. Default, all bonds as_array : bool = False Return the lengths as a Numpy array. Defaults to Python list """ if self.configuration.periodicity == 0: xyz = self.configuration.atoms.get_coordinates(as_array=True) if self.configuration.symmetry.n_symops > 1: raise NotImplementedError("Symmetry not implemented for molecules.") atom_pairs = self.configuration.symmetry.bond_atoms Is = [i for i, j in atom_pairs] Js = [j for i, j in atom_pairs] R = np.linalg.norm(xyz[Is] - xyz[Js], axis=1) else: atom_pairs = self.configuration.symmetry.bond_atoms Is = [i for i, j in atom_pairs] Js = [j for i, j in atom_pairs] uvw = self.configuration.atoms.get_coordinates(as_array=True) offsets = self.configuration.symmetry.bond_offsets offsets1 = [off[0] for off in offsets] offsets2 = [off[1] for off in offsets] delta = uvw[Js] + offsets2 - uvw[Is] - offsets1 dxyz = self.configuration.cell.to_cartesians(delta, as_array=True) R = np.linalg.norm(dxyz, axis=1) if logger.isEnabledFor(logging.DEBUG): import mendeleev uvw = self.configuration.atoms.get_coordinates(as_array=True) logger.debug( "i-j | u v w o1 o2 o3 " "u v w o1 o2 o3 | du dv dw | r" ) for ij, off, i, j, r in zip(atom_pairs, offsets, Is, Js, R.tolist()): off1x, off1y, off1z = off[0] off2x, off2y, off2z = off[1] xi, yi, zi = uvw[i].tolist() xj, yj, zj = uvw[j].tolist() delta = uvw[j] - uvw[i] xd, yd, zd = delta.tolist() logger.debug( f"{i}-{j} | " f"{xi:5.2f} {yi:5.2f} {zi:5.2f}({off1x:4} {off1y:4} {off1z:4}) " f"{xj:5.2f} {yj:5.2f} {zj:5.2f}({off2x:4} {off2y:4} {off2z:4}) " f" | {xd:5.2f} {yd:5.2f} {zd:5.2f}" f" | {r=:7.4f}" ) generators = self.configuration.symmetry.atom_generators gens = [] for tmp in generators: gens.extend(tmp) to_asym = self.configuration.symmetry.atom_to_asymmetric_atom logger.debug("") logger.debug("Coordinates") symbols = self.configuration.atoms.symbols els = mendeleev.element(symbols) radii = [el.covalent_radius_pyykko / 100.0 for el in els] for symbol, radius, tmp in zip(symbols, radii, uvw.tolist()): logger.debug( f" {symbol:2s} {radius=:7.4f} {tmp[0]:7.4f} {tmp[1]:7.4f} " f"{tmp[2]:7.4f}" ) logger.debug("") logger.debug("Distance matrix") count = 0 n_atoms = self.configuration.n_atoms found = [] for i in range(n_atoms): for j in range(n_atoms): i_asym = to_asym[i] j_asym = to_asym[j] radius_i = radii[i] radius_j = radii[j] delta = uvw[j] - uvw[i] offset = [] across_cell = False cell_offset = [] for x in delta.round(2).tolist(): if x == -0.5: across_cell = True cell_offset.append(1) elif x == 0.5: across_cell = True cell_offset.append(-1) else: cell_offset.append(0) if x < -0.5: offset.append(int(0.5 - x)) elif x > 0.5: offset.append(-int(0.5 + x)) else: offset.append(0) # offset = np.select([tmp <= -0.5, tmp > 0.5], [1, -1], 0) delta3 = delta + offset if self.configuration.periodicity != 0: delta3 = self.configuration.cell.to_cartesians( delta3, as_array=True ) r = np.round(np.linalg.norm(delta3), 4) if r > 0.0 and r < 1.1 * (radius_i + radius_j): o1, o2, o3 = offset if i > j: i_asym, j_asym = j_asym, i_asym ii, jj = j, i radius_i, radius_j = radius_j, radius_i o1, o2, o3 = -o1, -o2, -o3 delta = -delta else: ii, jj = i, j if (ii, jj, o1, o2, o3) in found: continue found.append((ii, jj, o1, o2, o3)) count += 1 xi, yi, zi = uvw[ii].tolist() xj, yj, zj = uvw[jj].tolist() xd, yd, zd = delta.tolist() logger.debug( f"{count:2}: {ii}-{jj} " f"({i_asym}:{gens[ii]:2} {j_asym}:{gens[jj]:2}) {r} " f"({o1:2} {o2:2} {o3:2})" f" {xj:7.4f} {yj:7.4f} {zj:7.4f} -" f" {xi:7.4f} {yi:7.4f} {zi:7.4f} =" f" {xd:7.4f} {yd:7.4f} {zd:7.4f}" ) if across_cell: # Which direction to move c1, c2, c3 = cell_offset o1, o2, o3 = o1 + c1, o2 + c2, o2 + c3 xj, yj, zj = xj + c1, yj + c2, zj + c3 xd, yd, zd = xd + c1, yd + c2, zd + c3 count += 1 logger.debug( f"{count:2}: {ii}-{jj} " f"({i_asym}:{gens[ii]:2} {j_asym}:{gens[jj]:2}) {r}" f" ({o1:2} {o2:2} {o3:2})" f" {xj:7.4f} {yj:7.4f} {zj:7.4f} -" f" {xi:7.4f} {yi:7.4f} {zi:7.4f} =" f" {xd:7.4f} {yd:7.4f} {zd:7.4f}" ) if asymmetric: # Lazy approach ... just pull out the first bond of each group rs = [] for tmp in self.configuration.bonds.bonds_for_asymmetric_bonds: if len(tmp) == 0: rs.append(0.0) else: rs.append(R[tmp[0]]) if as_array: return np.array(rs) else: return rs if as_array: return R else: return R.tolist() def symmetric_bonds_to_dataframe(self): """Return the symmetric bonds as a Pandas Dataframe.""" symmetry = self.configuration.symmetry to_asym_bond = symmetry.bond_to_asymmetric_bond atoms = symmetry.bond_atoms bondorders = self.asymmetric_bondorders if self.configuration.periodicity == 0: data = { "AsymBond": [i for i in to_asym_bond], "i_atom": [i for i, j in atoms], "j_atom": [j for i, j in atoms], "bondorder": [bondorders[i] for i in to_asym_bond], } else: offsets = symmetry.bond_offsets data = { "AsymBond": [i for i in to_asym_bond], "i_atom": [i for i, j in atoms], "j_atom": [j for i, j in atoms], "bondorder": [bondorders[i] for i in to_asym_bond], "offset1": [i for i, j, k in offsets], "offset2": [j for i, j, k in offsets], "offset3": [k for i, j, k in offsets], } df = pandas.DataFrame.from_dict(data) return df def to_dataframe(self): """Return the bonds as a Pandas Dataframe.""" cursor = self.bonds() data = {row[0]: row[1:] for row in cursor} columns = [x[0] for x in cursor.description[1:]] df = pandas.DataFrame.from_dict(data, orient="index", columns=columns) return df class _SubsetBonds(_Bonds): """The Bonds class describes the bonds in a subset""" def __init__(self, configuration, subset_id): self._sid = subset_id # Caching self._template_id = None self._template = None super().__init__(configuration) def __eq__(self, other): """Return a boolean if this object is equal to another""" raise NotImplementedError() @property def ids(self): """The ids of the bonds in the subset.""" return self.get_bond_ids() @property def n_asymmetric_bonds(self): """The number of asymmetric bonds.""" sql = """ SELECT COUNT(*) FROM bond WHERE id IN (SELECT bond FROM bondset_bond WHERE bondset = ?) AND i IN (SELECT atom FROM subset_atom WHERE subset = ?) AND j IN (SELECT atom FROM subset_atom WHERE subset = ?) """ self.cursor.execute(sql, (self.bondset, self.subset_id)) return self.cursor.fetchone()[0] @property def n_bonds(self): """The number of bonds in this subset.""" if self.configuration.symmetry.n_symops == 1: return self.n_asymmetric_bonds else: n = 0 asym_bonds = self.bonds_for_asymmetric_bonds b_ids = self.configuration.bonds.ids for i in self.ids: index = b_ids.index(i) n += len(asym_bonds[index]) return n def append(self, **kwargs): """Append one or more bonds. Not currently allowed in subsets.""" raise NotImplementedError("Can't add bonds in a subset yet.") def bonds(self, *args): """Returns an iterator over the bonds. Parameters ---------- args : [str] Added selection criteria for the SQL, one word at a time. Returns ------- sqlite3.Cursor A cursor that returns sqlite3.Row objects for the bonds. """ sql = """ SELECT * FROM bond WHERE id IN (SELECT bond FROM bondset_bond WHERE bondset = ?) AND i IN (SELECT atom FROM subset_atom WHERE subset = ?) AND j IN (SELECT atom FROM subset_atom WHERE subset = ?) """ return self.db.execute(sql, (self.bondset, self.subset_id)) def contains_bond(self, i, j): """Whether there is a bond between atoms i and j.""" # get canonical order if i > j: j, i = i, j sql = """ SELECT COUNT(*) FROM bond WHERE i = ? AND j = ? AND id IN (SELECT bond FROM bondset_bond WHERE bondset = ?) AND i IN (SELECT atom FROM subset_atom WHERE subset = ?) AND j IN (SELECT atom FROM subset_atom WHERE subset = ?) """ self.cursor.execute(sql, (i, j, self.bondset, self.subset_id)) return self.cursor.fetchone()[0] == 1 def delete_bond(self, i, j=None, force=False): """Delete a bond, if present. Not currently allowed in subsets.""" raise NotImplementedError("Can't delete bonds from a subset yet.") def delete(self, atoms=None): """Deletes all the bonds, optionally from atoms. Not currently allowed in subsets.""" raise NotImplementedError("Can't add bonds in a subset yet.") def diff(self, other): """Difference between these bonds and another Not currently allowed in subsets.""" raise NotImplementedError("Can't add bonds in a subset yet.") def get_bond(self, i, j=None, force=False): """Return the row for a bond. Parameters ---------- i : int Either the first atom in the bond or the bond id. j : int = None If not None, then the second atom in the bond. force : bool = False If true, ignore missing bonds. Returns ------- row : SQLite3.Row The Row object for the bond """ if j is None: sql = """ SELECT * FROM bond WHERE id = ? AND id IN (SELECT bond FROM bondset_bond WHERE bondset = ?) AND i IN (SELECT atom FROM subset_atom WHERE subset = ?) AND j IN (SELECT atom FROM subset_atom WHERE subset = ?) """ self.cursor.execute(sql, (i, self.bondset, self.subset_id, self.subset_id)) row = self.cursor.fetchone() if row is None and not force: raise KeyError(f"No bond id = {i} found") else: if i > j: j, i = i, j sql = """ SELECT COUNT(*) FROM bond WHERE i = ? AND j = ? AND id IN (SELECT bond FROM bondset_bond WHERE bondset = ?) AND i IN (SELECT atom FROM subset_atom WHERE subset = ?) AND j IN (SELECT atom FROM subset_atom WHERE subset = ?) """ self.cursor.execute( sql, (i, j, self.bondset, self.subset_id, self.subset_id) ) row = self.cursor.fetchone() if row is None and not force: raise KeyError(f"No bond from {i} to {j} found") return row def get_column(self, key): """Return a column of data. Parameters ---------- key : str The column (attribute) to getLogger Returns ------- column : Column The column object requested. """ if key == "i" or key == "j" or key == "id": sql = f""" SELECT {key} FROM bond AND id IN ( SELECT bond FROM bondset_bond WHERE bondset = {self.bondset} ) AND i IN ( SELECT atom FROM subset_atom WHERE subset = {self.subset_id} ) AND j IN ( SELECT atom FROM subset_atom WHERE subset = {self.subset_id} ) """ return _FrozenColumn(self, key, sql) else: sql = f""" SELECT id, {key} FROM bond AND id IN ( SELECT bond FROM bondset_bond WHERE bondset = {self.bondset} ) AND i IN ( SELECT atom FROM subset_atom WHERE subset = {self.subset_id} ) AND j IN ( SELECT atom FROM subset_atom WHERE subset = {self.subset_id} ) """ return _Column(self, key, sql) def get_column_data(self, key): """Return a column of data as a Python list. Parameters ---------- key : str The column (attribute) to getLogger Returns ------- column : Column The column object requested. """ sql = f""" SELECT {key} FROM bond AND id IN ( SELECT bond FROM bondset_bond WHERE bondset = {self.bondset} ) AND i IN ( SELECT atom FROM subset_atom WHERE subset = {self.subset_id} ) AND j IN ( SELECT atom FROM subset_atom WHERE subset = {self.subset_id} ) """ return [row[0] for row in self.db.execute(sql)]