asfpy/sqlite.py (149 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with #the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. "SQLite document-store wrapper for ASF." if not __debug__: raise RuntimeError("This code requires Assertions to be enabled") import sqlite3 import typing DEFAULT_ISOLATION_LEVEL = None # When None, enables auto-commit mode in sqlite # https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.isolation_level class AsfpyDBError(Exception): pass class DB: def __init__(self, fp: str, isolation_level: typing.Optional[str] = DEFAULT_ISOLATION_LEVEL): self.connector = sqlite3.connect(fp, isolation_level=isolation_level) self.connector.row_factory = sqlite3.Row self.cursor = self.connector.cursor() # Need sqlite 3.25.x or higher for upserts self.upserts_supported: bool = (sqlite3.sqlite_version >= "3.25.0") def run(self, cmd: str, *args): """ Runs an SQLITE command in a cursor, but does not commit changes to disk @param cmd: The command to run @param args: Optional interpolated arguments """ self.cursor.execute(cmd, args) def runc(self, cmd: str, *args): """ Runs an SQLITE command and commits the changes to disk @param cmd: The command to run @param args: Optional interpolated arguments """ self.cursor.execute(cmd, args) self.connector.commit() def delete(self, table: str, **target): """ Deletes one or more matching entries from a table where a specific document key/value matches. @param table: The table to remove entries from @param target: Variable key/value pairs to form the selection match. For instance, user=janedoe """ if not target: raise AsfpyDBError("DELETE must have at least one defined target value for locating where to delete from") items = target.items() # Use the same ordering for keys/values search = " AND ".join("`%s` = ?" % uk for uk, uv in items) values = [uv for uk, uv in items] statement = f'DELETE FROM {table} WHERE {search}' self.runc(statement, *values) def update(self, table: str, document: dict, **target): """ Updates one or more rows in a table where the target key/value pair matches Example: update('accounts', { 'name': 'jane doe', 'password': '1245' }, user_id='janedoe') would update any rows where user_id is 'janedoe' and set name and password. @param table: The table to edit @param document: The document, as a dict, to update (target values) @param target: The search key/value pair for finding the right document. """ if not target: raise AsfpyDBError("UPDATE must have at one defined target to specify the row to update") k, v = next(iter(target.items())) items = document.items() # Use the same ordering for keys/values columns = ", ".join("%s = ?" % uk for uk, uv in items) statement = f'UPDATE {table} SET {columns} WHERE {k} = ?;' values = [uv for uk, uv in items] values.append(v) # unique constraint self.runc(statement, *values) def insert(self, table: str, document: dict): """ Inserts a row into a table @param table: The table to insert the row into @param document: The row data, as a dict, to insert. """ items = document.items() # Use the same ordering for keys/values columns = ", ".join("`%s`" % uk for uk, uv in items) questionmarks = ", ".join(['?'] * len(items)) statement = f'INSERT INTO {table} ({columns}) VALUES ({questionmarks});' values = [uv for uk, uv in items] self.runc(statement, *values) def upsert(self, table: str, document: dict, **target): """ Performs an upsert in a table with unique constraints. Insert if not present, update otherwise. @param table: The table to upsert into @param document: The document to insert/update (depending on whether it exists) @param target: Target search key/value parameters to look for existing document. """ # Always have the target identifier as part of the row if not target: raise AsfpyDBError("UPSERTs must have at least one defined target value for locating where to upsert") k, v = next(iter(target.items())) document[k] = v # table: foo # bar: 1 # baz: 2 # INSERT INTO foo (bar,baz) VALUES (?,?) ON CONFLICT (bar) DO UPDATE SET (bar=?, foo=?) WHERE bar=?,(1,2,1,2,1,) if self.upserts_supported: items = document.items() # Use the same ordering for keys/values variables = ", ".join("`%s`" % uk for uk, uv in items) questionmarks = ", ".join(['?'] * len(items)) upserts = ", ".join("`%s` = ?" % uk for uk, uv in items) statement = f'INSERT INTO {table} ({variables}) VALUES ({questionmarks}) ON CONFLICT({k}) DO UPDATE SET {upserts} WHERE {k} = ?;' # insert values, update values, and the unique constraint value values = ([uv for uk, uv in items] * 2) + [v] self.runc(statement, *values) # Older versions of sqlite do not support 'ON CONFLICT', so we'll have to work around that... else: try: # Try to insert self.insert(table, document) except sqlite3.IntegrityError: # Conflict, update instead self.update(table, document, **target) def fetch(self, table: str, limit: int = 1, **params) -> typing.Iterator[dict]: """ Searches a table for matching params, returns up to $limit items that match, as dicts in an iterator. If no limit is specified, returns all matches. @param table: Table to fetch rows from @param limit: The maximum number of rows to fetch. Can be None, for all rows @param params: Search parameters as key/value pairs @return: An iterator with all the found rows as dicts """ if params: items = params.items() # Use the same ordering for keys/values search = " AND ".join("`%s` = ?" % uk for uk, uv in items) values = [uv for uk, uv in items] else: search = "1" values = [] statement = f'SELECT * FROM {table} WHERE {search}' if limit: statement += f' LIMIT {limit}' rows_left = limit self.cursor.execute(statement, values) while True: rowset = self.cursor.fetchmany() if not rowset: return # break iteration for row in rowset: yield dict(row) if limit: rows_left -= len(rowset) assert rows_left >= 0 if rows_left == 0: return # break iteration def fetchone(self, table_name: str, **params) -> typing.Optional[dict]: """ Fetches a single row from a table, or None if no match was found. @param table_name: The table to search in @param params: Search parameters as key/value pairs @return: If a match was found, returns the matching row as a dict, else None """ try: return next(self.fetch(table_name, **params)) except StopIteration: # No more entries! return None def table_exists(self, table: str) -> bool: """ Checks if a table exists in the database @param table: The table to look for @return: Boolean True or False, depending on whether the table exists. """ return self.fetchone('sqlite_master', type='table', name=table) and True or False def test(dbname=':memory:'): testdb = db(dbname) cstatement = '''CREATE TABLE test ( foo varchar unique, bar varchar, baz real )''' # Create if not already here try: testdb.runc(cstatement) except sqlite3.OperationalError as e: # Table exists assert str(e) == "table test already exists" # Insert (may fail if already tested) try: testdb.insert('test', {'foo': 'foo1234', 'bar': 'blorgh', 'baz': 5}) except sqlite3.IntegrityError as e: assert str(e) == "UNIQUE constraint failed: test.foo" # This must fail try: testdb.insert('test', {'foo': 'foo1234', 'bar': 'blorgh', 'baz': 2}) except sqlite3.IntegrityError as e: assert str(e) == "UNIQUE constraint failed: test.foo" # This must pass testdb.upsert('test', {'foo': 'foo1234', 'bar': 'blorgssh', 'baz': 8}, foo='foo1234') # This should fail with no target specified try: testdb.upsert('test', {'foo': 'foo1234', 'bar': 'blorgssh', 'baz': 8}) except AsfpyDBError as e: assert str(e) == "UPSERTs must have at least one defined target value for locating where to upsert" # This should all pass testdb.update('test', {'foo': 'foo4321'}, foo='foo1234') obj = testdb.fetchone('test', foo='foo4321') assert type(obj) is dict and obj.get('foo') == 'foo4321' obj = testdb.fetch('test', limit=5, foo = 'foo4321') assert str(type(obj)) == "<class 'generator'>" assert next(obj).get('foo') == 'foo4321' obj = testdb.fetchone('test', foo='foo9999') assert obj is None testdb.delete('test', foo='foo4321') assert testdb.table_exists('test') assert not testdb.table_exists('test2') # Let's insert 1000 rows, and perform a repeated fetch. for i in range(1000): testdb.insert('test', {'foo': str(i), 'bar': str(i), 'baz': i}) count = 0 for row in testdb.fetch('test', limit=None): assert int(row['foo']) == count count += 1 assert count == 1000 # Change the arraysize, and run it again. testdb.cursor.arraysize = 97 # ensure last fetch is short count = 0 for row in testdb.fetch('test', limit=None): assert int(row['foo']) == count count += 1 assert count == 1000 # One more run, with a limit. Leave the arraysize. count = 0 for row in testdb.fetch('test', limit=30): assert int(row['foo']) == count count += 1 assert count == 30 # Backwards compatibility db = DB if __name__ == '__main__': test()