pysteve/lib/plugins/stv.py (313 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ STV Voting Plugin """ import re, random ELECTED = 1 HOPEFUL = 2 ELIMINATED = 4 ALMOST = 8 ERROR_MARGIN = 0.00001 BILLIONTH = 0.000000001 from lib import constants import functools debug = [] def makeLetter(num, version = 2): if version == 1: return chr(ord('a')+num) else: x = ord('A') while num > 25: x += 1 num -= 25 letter = chr(x) + chr(ord('A') + num) return letter def validateSTV(vote, issue): "Tries to validate a vote, returns why if not valid, None otherwise" # aa, ab, ac, ad.... az, ba, bb, bc, bd...bz, ca, cb, cc, cd... letters = [makeLetter(x) for x in range(0,100)] letters.append('-') seats = vote.split(" ") for char in letters: if seats.count(char) > 1: return "Duplicate letters found" for char in seats: if char not in letters: return "Invalid characters in vote. Accepted are: %s" % ", ".join(letters) return None def run_vote(names, votes, num_seats): candidates = CandidateList(names, votes) # name -> Candidate remap = dict((c.name, c) for c in candidates.l) # Turn VOTES into a list of ordered-lists of Candidate objects letter = [] votes = [[remap[n] for n in choices] for choices in votes.values()] if candidates.count(ELECTED + HOPEFUL) <= num_seats: debug.append('All candidates elected') candidates.change_state(HOPEFUL, ELECTED) return candidates.retval() if num_seats <= 0: candidates.change_state(HOPEFUL, ELIMINATED) return candidates.retval() quota = None # not used on first pass iteration = 1 while candidates.count(ELECTED) < num_seats: debug.append('Iteration %d' % iteration) iteration += 1 quota = iterate_one(quota, votes, candidates, num_seats) candidates.reverse_random() debug.append('All seats full') candidates.change_state(HOPEFUL, ELIMINATED) return candidates.retval() class CandidateList(object): def __init__(self, names, votes): num_cand = len(names) randset = generate_random(num_cand, votes) self.l = [ ] for n, r in zip(names, randset): c = Candidate(n, r, num_cand-1) self.l.append(c) def count(self, state): count = 0 for c in self.l: if (c.status & state) != 0: count += 1 return count def retval(self): winners = [] for c in self.l: if c.status == ELECTED: winners.append(c.name) return winners def change_state(self, from_state, to_state): any_changed = False for c in self.l: if (c.status & from_state) != 0: c.status = to_state if to_state == ELECTED: c.elect() elif to_state == ELIMINATED: c.eliminate() any_changed = True return any_changed def reverse_random(self): # Flip the values to create a different ordering among candidates. Note # that this will alternate the domain between [0.0, 1.0) and (0.0, 1.0] for c in self.l: c.rand = 1.0 - c.rand def adjust_weights(self, quota): for c in self.l: if c.status == ELECTED: c.adjust_weight(quota) def print_results(self): for c in self.l: print ('%-40s%selected' % (c.name, c.status == ELECTED and ' ' or ' not ')) def dbg_display_tables(self, excess): total = excess for c in self.l: debug.append('%-20s %15.9f %15.9f' % (c.name, c.weight, c.vote)) total += c.vote debug.append('%-20s %15s %15.9f' %( 'Non-transferable', ' ', excess)) debug.append('%-20s %15s %15.9f' % ( 'Total', ' ', total)) class Candidate(object): def __init__(self, name, rand, ahead): self.name = name self.rand = rand self.ahead = ahead self.status = HOPEFUL self.weight = 1.0 self.vote = None # calculated later def elect(self): self.status = ELECTED debug.append('Elected: %s' % self.name) def eliminate(self): self.status = ELIMINATED self.weight = 0.0 debug.append('Eliminated: %s' % self.name) def adjust_weight(self, quota): assert quota is not None self.weight = (self.weight * quota) / self.vote @staticmethod def cmp(a,b): if a.ahead < b.ahead: return -1 if a.ahead == b.ahead: return (a.vote > b.vote) - (a.vote < b.vote) return 1 def iterate_one(quota, votes, candidates, num_seats): # assume that: count(ELECTED) < num_seats if candidates.count(ELECTED + HOPEFUL) <= num_seats: debug.append('All remaining candidates elected') candidates.change_state(HOPEFUL, ELECTED) return None candidates.adjust_weights(quota) changed, new_quota, surplus = recalc(votes, candidates, num_seats) if not changed and surplus < ERROR_MARGIN: debug.append('Remove Lowest (forced)') exclude_lowest(candidates.l) return new_quota def recalc(votes, candidates, num_seats): excess = calc_totals(votes, candidates) calc_aheads(candidates) candidates.dbg_display_tables(excess) quota = calc_quota(len(votes), excess, num_seats) any_changed = elect(quota, candidates, num_seats) surplus = calc_surplus(quota, candidates) any_changed |= try_remove_lowest(surplus, candidates) return any_changed, quota, surplus def calc_totals(votes, candidates): for c in candidates.l: c.vote = 0.0 excess = 0.0 for choices in votes: vote = 1.0 for c in choices: if c.status == HOPEFUL: c.vote += vote vote = 0.0 break if c.status != ELIMINATED: wv = c.weight * vote # weighted vote c.vote += wv vote -= wv if vote == 0.0: # nothing left to distribute break excess += vote return excess def calc_aheads(candidates): c_sorted = sorted(candidates.l, key=functools.cmp_to_key(Candidate.cmp)) last = 0 for i in range(1, len(c_sorted)+1): if i == len(c_sorted) or c_sorted[last] != c_sorted[i]: for c in c_sorted[last:i]: c.ahead = (i - 1) + last last = i def calc_quota(num_voters, excess, num_seats): if num_seats > 2: quota = (float(num_voters) - excess) / (float(num_seats + 1) + BILLIONTH) else: quota = (float(num_voters) - excess) / float(num_seats + 1) if quota < ERROR_MARGIN: raise Exception('Internal Error - very low quota') debug.append('Quota = %.9f' % quota) return quota def elect(quota, candidates, num_seats): for c in candidates.l: if c.status == HOPEFUL and c.vote >= quota: c.status = ALMOST any_changed = False while candidates.count(ELECTED + ALMOST) > num_seats: debug.append('Vote tiebreaker! voters: %d seats: %d' %( candidates.count(ELECTED + ALMOST), num_seats)) candidates.change_state(HOPEFUL, ELIMINATED) exclude_lowest(candidates.l) any_changed = True # we changed the candidates via exclude_lowest() any_changed |= candidates.change_state(ALMOST, ELECTED) return any_changed def calc_surplus(quota, candidates): surplus = 0.0 for c in candidates.l: if c.status == ELECTED: surplus += c.vote - quota debug.append('Total Surplus = %.9f' % surplus) return surplus def try_remove_lowest(surplus, candidates): lowest1 = 1e18 lowest2 = 1e18 which = None for c in candidates.l: if c.status == HOPEFUL and c.vote < lowest1: lowest1 = c.vote which = c if not which: debug.append("Could not find a subject to eliminate") return False for c in candidates.l: if c != which and c.status != ELIMINATED and c.vote < lowest2: lowest2 = c.vote diff = lowest2 - lowest1 if diff >= 0.0: debug.append('Lowest Difference = %.9f - %.9f = %.9f' % ( lowest2, lowest1, diff)) if diff > surplus: debug.append('Remove Lowest (unforced)') which.eliminate() return True return False def exclude_lowest(candidates): ### use: ahead = len(candidates) ?? ahead = 1000000000. # greater than any possible candidate.ahead rand = 1.1 # greater than any possible candidate.rand which = None used_rand = False random.shuffle(candidates) for c in candidates: if c.status == HOPEFUL or c.status == ALMOST: if c.ahead < ahead: ahead = c.ahead rand = c.rand which = c use_rand = False elif c.ahead == ahead: use_rand = True if c.rand < rand: rand = c.rand which = c if use_rand: debug.append('Random choice used!') assert which which.eliminate() def generate_random(count, votes): random.seed(len(votes)) # Seed based on number of votes # Generate COUNT values in [0.0, 1.0) values = [random.random() for x in range(count)] # Ensure there are no duplicates for value in values: if values.count(value) > 1: break else: # The loop finished without breaking out return values # STV Tally: # Version 1 == old style abcdefg... # Version 2 == new style AA,AB,AC,AD... def tallySTV(votes, issue, version = 2): rvotes = {} ovotes = votes # Cut out abstained votes. for vote in votes: if votes[vote].find("-") == -1: if version == 1: rvotes[vote] = votes[vote] else: rvotes[vote] = re.split(r"[\s,]", votes[vote]) votes = rvotes m = re.match(r"stv(\d+)", issue['type']) if not m: raise Exception("Not an STV vote!") numseats = int(m.group(1)) candidates = {} z = 0 for c in issue['candidates']: candidates[makeLetter(z, version)] = c['name'] z += 1 # run the stv calc # Try version 2 first, fall back to version 1 if it breaks winners = [] if version == 2: try: winners = run_vote(candidates, votes, numseats) except: return tallySTV(ovotes, issue, 1) else: winners = run_vote(candidates, votes, numseats) winnernames = [] for c in winners: winnernames.append(candidates[c]) # Return the data return { 'votes': len(rvotes), 'winners': winners, 'winnernames': winnernames, 'debug': debug, 'version': version }, """ Winners: - %s """ % "\n - ".join(winnernames) constants.appendVote ( { 'key': "stv1", 'description': "Single Transferable Vote with 1 seat", 'category': 'stv', 'validate_func': validateSTV, 'vote_func': None, 'tally_func': tallySTV }, ) # Add ad nauseam for i in range(2,constants.MAX_NUM+1): constants.appendVote ( { 'key': "stv%u" % i, 'description': "Single Transferable Vote with %u seats" % i, 'category': 'stv', 'validate_func': validateSTV, 'vote_func': None, 'tally_func': tallySTV }, )