pysteve/lib/backends/sqlite.py (182 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import json import time from lib import constants import asfpy.sqlite """SQLite database wrapper for STeVe v2 (PySTeVe)""" DB_CREATE_STATEMENTS = ( """ CREATE TABLE "elections" ( "id" TEXT PRIMARY KEY UNIQUE, "title" TEXT NOT NULL, "owner" TEXT NOT NULL, "monitors" TEXT NOT NULL, "starts" INTEGER, "ends" INTEGER, "hash" TEXT NOT NULL, "open" TEXT NOT NULL, "closed" INTEGER );""", """ CREATE TABLE "issues" ( "id" TEXT PRIMARY KEY UNIQUE, "election" TEXT NOT NULL, "title" TEXT NOT NULL, "description" TEXT NOT NULL, "type" TEXT NOT NULL, "candidates" TEXT, "seconds" TEXT, "nominatedby" TEXT );""", """ CREATE TABLE "votes" ( "eid" TEXT PRIMARY KEY UNIQUE, "issue" TEXT NOT NULL, "election" TEXT NOT NULL, "key" TEXT NOT NULL, "data" TEXT NOT NULL );""", """ CREATE TABLE "voters" ( "id" TEXT PRIMARY KEY UNIQUE, "election" TEXT NOT NULL, "hash" TEXT NOT NULL, "uid" TEXT NOT NULL );""", """ CREATE TABLE "vote_history" ( "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, "eid" TEXT NOT NULL, "issue" TEXT NOT NULL, "election" TEXT NOT NULL, "key" TEXT NOT NULL, "data" TEXT NOT NULL ); """, ) class SteveSQLite(object): def __init__(self, config): self.config = config self.dbname = config.get("sqlite", "database") self.db = asfpy.sqlite.DB(self.dbname) if not self.db.table_exists("elections"): for stmt in DB_CREATE_STATEMENTS: self.db.runc(stmt) def pickle(doc: dict): ndoc = {} for k, v in doc.items(): if isinstance(v, list) or isinstance(v, dict) or (isinstance(v, str) and v.startswith("%JSON%:")): v = "%JSON%:" + json.dumps(v) ndoc[k] = v return ndoc def unpickle(doc: dict): ndoc = {} for k, v in doc.items(): if isinstance(v, str) and v.startswith("%JSON%:"): try: v = json.loads(v[7:]) except json.JSONDecodeError: pass ndoc[k] = v return ndoc class SQLiteBackend: def __init__(self, config): "Init - get config and turn it into an ES instance" self.DB = SteveSQLite(config) def document_exists(self, election, *issue): "Does this election or issue exist?" eid = election if issue and issue[0]: return self.DB.db.fetchone("issues", election=election, id=issue[0]) return self.DB.db.fetchone("elections", id=eid) def get_basedata(self, election): "Get base data from an election" res = self.DB.db.fetchone("elections", id=election) if res: return unpickle(res) def close(self, election, reopen=False): "Mark an election as closed" basedata = self.get_basedata(election) if reopen == True: basedata["closed"] = 0 else: basedata["closed"] = 1 self.DB.db.update("elections", pickle(basedata), id=election) def issue_get(self, electionID, issueID): "Get JSON data from an issue" issuedata = None ihash = "" res = self.DB.db.fetchone("issues", id=issueID, election=electionID) if res: issuedata = unpickle(res) ihash = constants.hexdigest(json.dumps(issuedata)) return issuedata, ihash def votes_get(self, electionID, issueID): "Read votes and return as a dict" res = [unpickle(x) for x in self.DB.db.fetch("votes", limit=None, election=electionID, issue=issueID)] if res: votes = {} for entry in res: votes[entry["key"]] = entry["data"]["vote"] return votes return {} def votes_get_raw(self, electionID, issueID): "Read votes and return raw format" res = [unpickle(x) for x in self.DB.db.fetch("votes", limit=None, election=electionID, issue=issueID)] if res: votes = [] for entry in res: votes.append(entry) return votes return {} def vote_history(self, electionID, issueID): "Read vote history and return raw format" res = [unpickle(x) for x in self.DB.db.fetch("vote_history", limit=None, election=electionID, issue=issueID)] if res: votes = [] for entry in res: votes.append(entry) return votes return [] def election_create(self, _electionid, basedata): """Create a new election""" self.DB.db.insert("elections", pickle(basedata)) def election_update(self, electionID, basedata): "Update an election with new data" self.DB.db.update("elections", pickle(basedata), id=electionID) def issue_update(self, electionID, issueID, issueData): "Update an issue with new data" # Make a clean issue document that only has the sqlite-defined fields in it issue_fields = ("id", "election", "title", "description", "type", "candidates", "seconds", "nominatedby") new_data = {k:v for k,v in issueData.items() if k in issue_fields} self.DB.db.update("issues", pickle(new_data), id=issueID, election=electionID) def issue_list(self, election): "List all issues in an election" issues = [x["id"] for x in self.DB.db.fetch("issues", limit=None, election=election)] return issues def election_list(self): "List all elections" elections = [x['id'] for x in self.DB.db.fetch("elections", limit=None)] return elections def vote(self, electionID, issueID, uid, vote, vhash=None): "Casts a vote on an issue" eid = constants.hexdigest(electionID + ":" + issueID + ":" + uid) now = time.time() if vhash: eid = vhash doc = pickle( {"eid": eid, "issue": issueID, "election": electionID, "key": uid, "data": {"timestamp": now, "vote": vote}} ) self.DB.db.upsert("votes", doc, eid=eid) self.DB.db.insert("vote_history", doc) def issue_delete(self, electionID, issueID): "Deletes an issue if it exists" self.DB.db.delete("issues", election=electionID, id=issueID) def issue_create(self, electionID, issueID, data): "Create an issue" # iid = hashlib.sha224((electionID + "/" + issueID).encode("utf-8")).hexdigest() self.DB.db.insert("issues", pickle(data)) def voter_get_uid(self, electionID, votekey): "Get the UID/email for a voter given the vote key hash" # First, try the raw hash as an ID res = self.DB.db.fetchone("voters", id=votekey) if res: return unpickle(res)['uid'] # Try looking for hash key res = self.DB.db.fetchone("voters", hash=votekey) if res: return unpickle(res)['uid'] return False # No ballot found. def voter_add(self, election, PID, xhash): "Add a voter to the DB" eid = constants.hexdigest(election + ":" + PID) doc = pickle({"id": eid, "election": election, "hash": xhash, "uid": PID}) self.DB.db.upsert("voters", doc, id=eid) def ballot_scrub(self, election, xhash, uid=None): "Scrub a ballot" if uid: xhash = constants.hexdigest(election + ":" + uid) # Find ballots and votes matching bid = self.voter_get_uid(election, xhash) if not bid: return None issues = self.issue_list(election) for issue in issues: vhash = constants.hexdigest(xhash + issue) try: self.DB.db.delete("votes", eid=vhash) except: pass return True def voter_remove(self, election, UID): "Remove the voter with the given UID" votehash = constants.hexdigest(election + ":" + UID) self.DB.db.delete("voters", id=votehash) def voter_has_voted(self, election, issue, uid): "Return true if the voter has voted on this issue, otherwise false" # a vote trail can be either the old eid or the vhash. eid = constants.hexdigest(election + ":" + issue + ":" + uid) vhash = constants.hexdigest(constants.hexdigest(election + ":" + uid) + issue) try: return self.DB.db.fetchone("votes", eid=eid) or self.DB.db.fetchone("votes", eid=vhash) except: return False def voter_ballots(self, UID): """Find all elections (and ballots) this user has participated in""" # First, get all elections elections = {} res = [unpickle(x) for x in self.DB.db.fetch("elections", limit=None)] for election in res: # Mark election open or closed elections[election["id"]] = {"title": election["title"], "open": False if election["closed"] else True} # Then, get all ballots and note whether they still apply or not ballots = {} res = [pickle(x) for x in self.DB.db.fetch("voters", limit=100, uid=UID)] for ballot in res: ballots[ballot["election"]] = {"ballot": ballot["id"], "metadata": elections[ballot["election"]]} return ballots constants.appendBackend("sqlite", SQLiteBackend)