Source code for ska_dbi.DBI

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
ska_dbi provides simple methods for database access and data insertion.
Features:

- Sqlite connections are supported.
- Integration with numpy record arrays.
- Verbose mode to show transaction information.
"""
import os
import sys
import sqlite3 as dbapi2
from ska_dbi.common import DEFAULT_CONFIG, NoPasswordError


[docs]def _denumpy(x): """ Try using the numpy.tolist() to convert to native python type. DBI's can't typically handle numpy vals.""" try: return x.tolist() except: return x
[docs]class DBI(object): """ Database interface class. Example usage:: db = DBI(dbi='sqlite', server=dbfile, numpy=False, verbose=True) :param dbi: Database interface name (sqlite) :param server: Server name (or file name for sqlite) :param user: User name (optional) :param autocommit: Automatically commit after each transaction. Slower but easier to code. :param numpy: Return multirow results as numpy.recarray; input vals can be numpy types :param verbose: Print transaction info :param authdir: Directory containing authorization files :rtype: DBI object """ def __init__(self, dbi=None, server=None, numpy=True, autocommit=True, verbose=False, **kwargs): if dbi != 'sqlite': raise ValueError(f'ska_dbi.DBI only supports sqlite at this time. Got {dbi}.') self.dbi = dbi self.server = server or DEFAULT_CONFIG[dbi].get('server') self.numpy = numpy self.autocommit = autocommit self.verbose = verbose if self.verbose: print('Connecting to', self.dbi, 'server', self.server) self.conn = dbapi2.connect(self.server) self.Error = dbapi2.Error def __enter__(self): """Context manager enter runtime context. No action required, just return self.""" return self def __exit__(self, exc_type, exc_value, traceback): """ Context manager exit run time context. Close connection. By the implicit "return None" this will raise any exceptions after closing. """ self.conn.close()
[docs] def commit(self): """Commit transactions""" self.conn.commit()
[docs] def execute(self, expr, vals=None, commit=None): """ Run ``self.cursor.execute(expr, vals)`` with possibility of verbose output and commit. Multiple commands can by executed by separating them with a semicolon at the end of a line. If ``vals`` are supplied they will be applied to each of the commands. :param expr: SQL expression to execute :param vals: Values associated with the expression (optional) :param commit: Commit after executing C{expr} (default = self.autocommit) :rtype: None """ # Get a new cursor (implicitly closing any previous cursor) self.cursor = self.conn.cursor() for subexpr in expr.split(';\n'): if vals is not None: args = (subexpr, vals) else: args = (subexpr,) if self.verbose: print('Running:', args) self.cursor.execute(*args) if (commit is None and self.autocommit) or commit: self.commit()
[docs] def fetch(self, expr, vals=None,): """ Return a generator that will fetch one row at a time after executing with args. Example usage:: for row in db.fetch(expr, vals): print row['column'] :param expr: SQL expression to execute :param vals: Values associated with the expression (optional) :rtype: Generator that will get one row of database as dict() via next() """ self.execute(expr, vals, commit=False) cols = [x[0] for x in self.cursor.description] while True: vals = self.cursor.fetchone() if vals: yield dict(zip(cols, vals)) else: if self.autocommit: self.commit() self.cursor.close() break
[docs] def fetchone(self, expr, vals=None,): """Fetch one row after executing args. This always gets the first row of the SQL query. Use ska_dbi.fetch() to get multiple rows one at a time. Example usage:: row = db.fetchone(expr, vals) print row['column'] :param expr: SQL expression to execute :param vals: Values associated with the expression (optional) :rtype: One row of database as dict() """ try: val = next(self.fetch(expr, vals)) self.cursor.close() return val except StopIteration: return None
[docs] def fetchall(self, expr, vals=None): """Fetch all rows after executing args. Example usage:: rows = db.fetchall(expr, vals) print rows[1:5]['column'] :param expr: SQL expression to execute :param vals: Values associated with the expression (optional) :rtype: All rows of database as numpy.rec.recarray or list of dicts, depending on self.numpy """ self.execute(expr, vals, commit=False) cols = [x[0] for x in self.cursor.description] vals = self.cursor.fetchall() if self.autocommit: self.commit() self.cursor.close() if self.numpy and vals: import numpy # Would be good to set dtype explicitly from database info instead of # having numpy auto-determine types return numpy.rec.fromrecords(vals, names=cols) else: return [dict(zip(cols, x)) for x in vals]
[docs] def insert(self, row, tablename, replace=False, commit=None): """Insert data row into table tablename. :param row: Data row for insertion (dict or numpy.record) :param tablename: Table name :param replace: If true then replace database record if it already exists :param commit: Commit insertion (default = self.autocommit) :rtype: None """ # Get the column names, either from numpy methods or from dict keys try: cols = sorted(row.dtype.names) except AttributeError: cols = sorted(row.keys()) # Make a tuple of the values to insert if self.numpy: vals = tuple(_denumpy(row[x]) for x in cols) else: vals = tuple(row[x] for x in cols) # Create the insert command depending on dbi. Start with the column # value replacement strings colrepls = ('?',) * len(cols) insert_str = "INSERT %s INTO %s (%s) VALUES (%s)" replace_str = replace and 'OR REPLACE' or '' cmd = insert_str % (replace_str, tablename, ','.join(cols), ','.join(colrepls)) # Finally run the insert command self.execute(cmd, vals, commit=commit) self.cursor.close()