pysteve/lib/backends/files.py (191 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import hashlib import json import os import time from lib import constants class FileBasedBackend: homedir = None def __init__(self, config): self.homedir = config.get("general", "homedir") def document_exists(self, election, *issue): "Returns True if an election/issue exists, False otherwise" elpath = os.path.join(self.homedir, "issues", election) if issue: elpath += "/" + issue[0] + ".json" return os.path.isfile(elpath) else: return os.path.isdir(elpath) def get_basedata(self, election): "Get base data from an election" elpath = os.path.join(self.homedir, "issues", election) if os.path.isdir(elpath): with open(elpath + "/basedata.json", "r") as f: data = f.read() f.close() basedata = json.loads(data) if hideHash and 'hash' in basedata: del basedata['hash'] basedata['id'] = election return basedata return None def close(self, election, reopen = False): "Mark an election as closed" basedata = self.get_basedata(election) elpath = os.path.join(self.homedir, "issues", election) if reopen: basedata['closed'] = False else: basedata['closed'] = True with open(elpath + "/basedata.json", "w") as f: f.write(json.dumps(basedata)) f.close() def issue_get(self, electionID, issueID): "Get JSON data from an issue" issuedata = None ihash = "" issuepath = os.path.join(self.homedir, "issues", electionID, issueID) + ".json" if os.path.isfile(issuepath): with open(issuepath, "r") as f: data = f.read() ihash = hashlib.sha224(data).hexdigest() f.close() issuedata = json.loads(data) return issuedata, ihash def votes_get(self, electionID, issueID): "Read votes from the vote file" rvotes = getVotesRaw(electionID, issueID) votes = {} for key in rvotes: votes[key] = rvotes[key]['vote'] return {} def votes_get_raw(self, electionID, issueID): issuepath = os.path.join(self.homedir, "issues", electionID, issueID) + ".json.votes" if os.path.isfile(issuepath): with open(issuepath, "r") as f: votes = json.loads(f.read()) f.close() return votes return {} def vote_history(self, electionID, issueID): issuepath = os.path.join(self.homedir, "issues", electionID, issueID) + ".json.history" if os.path.isfile(issuepath): with open(issuepath, "r") as f: votes = json.loads(f.read()) f.close() return votes return [] def election_create(self, eid, basedata): elpath = os.path.join(self.homedir, "issues", eid) os.mkdir(elpath) with open(elpath + "/basedata.json", "w") as f: f.write(json.dumps(basedata)) f.close() with open(elpath + "/voters.json", "w") as f: f.write("{}") f.close() def election_update(self, electionID, basedata): elpath = os.path.join(self.homedir, "issues", electionID) with open(elpath + "/basedata.json", "w") as f: f.write(json.dumps(basedata)) f.close() def issue_update(self, electionID, issueID, issueData): issuepath = os.path.join(self.homedir, "issues", electionID, issueID) + ".json" with open(issuepath, "w") as f: f.write(json.dumps(issueData)) f.close() def issue_list(self, election): "List all issues in an election" issues = [] elpath = os.path.join(self.homedir, "issues", election) if os.path.isdir(elpath): issues = [f.strip(".json") for f in os.listdir(elpath) if os.path.isfile(os.path.join(elpath, f)) and f != "basedata.json" and f != "voters.json" and f.endswith(".json")] return issues def election_list(self): "List all elections" elections = [] path = os.path.join(self.homedir, "issues") if os.path.isdir(path): elections = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f))] return elections def vote(self, electionID, issueID, uid, vote): "Casts a vote on an issue" votes = {} now = time.time() issuepath = os.path.join(self.homedir, "issues", electionID, issueID) + ".json" if os.path.isfile(issuepath + ".votes"): with open(issuepath + ".votes", "r") as f: votes = json.loads(f.read()) f.close() votes[uid] = { 'vote': vote, 'timestamp': now } with open(issuepath + ".votes", "w") as f: f.write(json.dumps(votes)) f.close() # history backlog vote_history = [] issuepath = os.path.join(self.homedir, "issues", electionID, issueID) + ".json" if os.path.isfile(issuepath + ".history"): with open(issuepath + ".history", "r") as f: vote_history = json.loads(f.read()) f.close() vote_history.append({ 'key': uid, 'data': { 'vote': vote, 'timestamp': now } }) with open(issuepath + ".history", "w") as f: f.write(json.dumps(vote_history)) f.close() def issue_delete(self, electionID, issueID): "Deletes an issue if it exists" issuepath = os.path.join(self.homedir, "issues", electionID, issueID) + ".json" if os.path.isfile(issuepath): os.unlink(issuepath) if os.path.isfile(issuepath + ".votes"): os.unlink(issuepath + ".votes") def issue_create(self, electionID, issueID, data): issuepath = os.path.join(self.homedir, "issues", electionID, issueID) + ".json" with open(issuepath, "w") as f: f.write(json.dumps(data)) f.close() def voter_get_uid(self, electionID, votekey): "Get vote UID/email with a given vote key hash" elpath = os.path.join(self.homedir, "issues", electionID) with open(elpath + "/voters.json", "r") as f: voters = json.loads(f.read()) f.close() for voter in voters: if voters[voter] == xhash: return voter return None def voter_add(self, election, PID, xhash): elpath = os.path.join(self.homedir, "issues", election) with open(elpath + "/voters.json", "r") as f: voters = json.loads(f.read()) f.close() voters[PID] = xhash with open(elpath + "/voters.json", "w") as f: f.write(json.dumps(voters)) f.close() def voter_remove(self, election, UID): elpath = os.path.join(self.homedir, "issues", election) with open(elpath + "/voters.json", "r") as f: voters = json.loads(f.read()) f.close() if UID in voters: del voters[UID] with open(elpath + "/voters.json", "w") as f: f.write(json.dumps(voters)) f.close() def voter_has_voted(self, election, issue, uid): path = os.path.join(self.homedir, "issues", election, issue) votes = {} if os.path.isfile(path + ".json.votes"): with open(path + ".json.votes", "r") as f: votes = json.loads(f.read()) f.close() return True if uid in votes else False constants.appendBackend("file", FileBasedBackend)