pysteve/lib/plugins/stv.py (313 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" STV Voting Plugin """
import re, random
ELECTED = 1
HOPEFUL = 2
ELIMINATED = 4
ALMOST = 8
ERROR_MARGIN = 0.00001
BILLIONTH = 0.000000001
from lib import constants
import functools
debug = []
def makeLetter(num, version = 2):
if version == 1:
return chr(ord('a')+num)
else:
x = ord('A')
while num > 25:
x += 1
num -= 25
letter = chr(x) + chr(ord('A') + num)
return letter
def validateSTV(vote, issue):
"Tries to validate a vote, returns why if not valid, None otherwise"
# aa, ab, ac, ad.... az, ba, bb, bc, bd...bz, ca, cb, cc, cd...
letters = [makeLetter(x) for x in range(0,100)]
letters.append('-')
seats = vote.split(" ")
for char in letters:
if seats.count(char) > 1:
return "Duplicate letters found"
for char in seats:
if char not in letters:
return "Invalid characters in vote. Accepted are: %s" % ", ".join(letters)
return None
def run_vote(names, votes, num_seats):
candidates = CandidateList(names, votes)
# name -> Candidate
remap = dict((c.name, c) for c in candidates.l)
# Turn VOTES into a list of ordered-lists of Candidate objects
letter = []
votes = [[remap[n] for n in choices] for choices in votes.values()]
if candidates.count(ELECTED + HOPEFUL) <= num_seats:
debug.append('All candidates elected')
candidates.change_state(HOPEFUL, ELECTED)
return candidates.retval()
if num_seats <= 0:
candidates.change_state(HOPEFUL, ELIMINATED)
return candidates.retval()
quota = None # not used on first pass
iteration = 1
while candidates.count(ELECTED) < num_seats:
debug.append('Iteration %d' % iteration)
iteration += 1
quota = iterate_one(quota, votes, candidates, num_seats)
candidates.reverse_random()
debug.append('All seats full')
candidates.change_state(HOPEFUL, ELIMINATED)
return candidates.retval()
class CandidateList(object):
def __init__(self, names, votes):
num_cand = len(names)
randset = generate_random(num_cand, votes)
self.l = [ ]
for n, r in zip(names, randset):
c = Candidate(n, r, num_cand-1)
self.l.append(c)
def count(self, state):
count = 0
for c in self.l:
if (c.status & state) != 0:
count += 1
return count
def retval(self):
winners = []
for c in self.l:
if c.status == ELECTED:
winners.append(c.name)
return winners
def change_state(self, from_state, to_state):
any_changed = False
for c in self.l:
if (c.status & from_state) != 0:
c.status = to_state
if to_state == ELECTED:
c.elect()
elif to_state == ELIMINATED:
c.eliminate()
any_changed = True
return any_changed
def reverse_random(self):
# Flip the values to create a different ordering among candidates. Note
# that this will alternate the domain between [0.0, 1.0) and (0.0, 1.0]
for c in self.l:
c.rand = 1.0 - c.rand
def adjust_weights(self, quota):
for c in self.l:
if c.status == ELECTED:
c.adjust_weight(quota)
def print_results(self):
for c in self.l:
print ('%-40s%selected' % (c.name, c.status == ELECTED and ' ' or ' not '))
def dbg_display_tables(self, excess):
total = excess
for c in self.l:
debug.append('%-20s %15.9f %15.9f' % (c.name, c.weight, c.vote))
total += c.vote
debug.append('%-20s %15s %15.9f' %( 'Non-transferable', ' ', excess))
debug.append('%-20s %15s %15.9f' % ( 'Total', ' ', total))
class Candidate(object):
def __init__(self, name, rand, ahead):
self.name = name
self.rand = rand
self.ahead = ahead
self.status = HOPEFUL
self.weight = 1.0
self.vote = None # calculated later
def elect(self):
self.status = ELECTED
debug.append('Elected: %s' % self.name)
def eliminate(self):
self.status = ELIMINATED
self.weight = 0.0
debug.append('Eliminated: %s' % self.name)
def adjust_weight(self, quota):
assert quota is not None
self.weight = (self.weight * quota) / self.vote
@staticmethod
def cmp(a,b):
if a.ahead < b.ahead:
return -1
if a.ahead == b.ahead:
return (a.vote > b.vote) - (a.vote < b.vote)
return 1
def iterate_one(quota, votes, candidates, num_seats):
# assume that: count(ELECTED) < num_seats
if candidates.count(ELECTED + HOPEFUL) <= num_seats:
debug.append('All remaining candidates elected')
candidates.change_state(HOPEFUL, ELECTED)
return None
candidates.adjust_weights(quota)
changed, new_quota, surplus = recalc(votes, candidates, num_seats)
if not changed and surplus < ERROR_MARGIN:
debug.append('Remove Lowest (forced)')
exclude_lowest(candidates.l)
return new_quota
def recalc(votes, candidates, num_seats):
excess = calc_totals(votes, candidates)
calc_aheads(candidates)
candidates.dbg_display_tables(excess)
quota = calc_quota(len(votes), excess, num_seats)
any_changed = elect(quota, candidates, num_seats)
surplus = calc_surplus(quota, candidates)
any_changed |= try_remove_lowest(surplus, candidates)
return any_changed, quota, surplus
def calc_totals(votes, candidates):
for c in candidates.l:
c.vote = 0.0
excess = 0.0
for choices in votes:
vote = 1.0
for c in choices:
if c.status == HOPEFUL:
c.vote += vote
vote = 0.0
break
if c.status != ELIMINATED:
wv = c.weight * vote # weighted vote
c.vote += wv
vote -= wv
if vote == 0.0: # nothing left to distribute
break
excess += vote
return excess
def calc_aheads(candidates):
c_sorted = sorted(candidates.l, key=functools.cmp_to_key(Candidate.cmp))
last = 0
for i in range(1, len(c_sorted)+1):
if i == len(c_sorted) or c_sorted[last] != c_sorted[i]:
for c in c_sorted[last:i]:
c.ahead = (i - 1) + last
last = i
def calc_quota(num_voters, excess, num_seats):
if num_seats > 2:
quota = (float(num_voters) - excess) / (float(num_seats + 1) + BILLIONTH)
else:
quota = (float(num_voters) - excess) / float(num_seats + 1)
if quota < ERROR_MARGIN:
raise Exception('Internal Error - very low quota')
debug.append('Quota = %.9f' % quota)
return quota
def elect(quota, candidates, num_seats):
for c in candidates.l:
if c.status == HOPEFUL and c.vote >= quota:
c.status = ALMOST
any_changed = False
while candidates.count(ELECTED + ALMOST) > num_seats:
debug.append('Vote tiebreaker! voters: %d seats: %d' %( candidates.count(ELECTED + ALMOST), num_seats))
candidates.change_state(HOPEFUL, ELIMINATED)
exclude_lowest(candidates.l)
any_changed = True # we changed the candidates via exclude_lowest()
any_changed |= candidates.change_state(ALMOST, ELECTED)
return any_changed
def calc_surplus(quota, candidates):
surplus = 0.0
for c in candidates.l:
if c.status == ELECTED:
surplus += c.vote - quota
debug.append('Total Surplus = %.9f' % surplus)
return surplus
def try_remove_lowest(surplus, candidates):
lowest1 = 1e18
lowest2 = 1e18
which = None
for c in candidates.l:
if c.status == HOPEFUL and c.vote < lowest1:
lowest1 = c.vote
which = c
if not which:
debug.append("Could not find a subject to eliminate")
return False
for c in candidates.l:
if c != which and c.status != ELIMINATED and c.vote < lowest2:
lowest2 = c.vote
diff = lowest2 - lowest1
if diff >= 0.0:
debug.append('Lowest Difference = %.9f - %.9f = %.9f' % ( lowest2, lowest1, diff))
if diff > surplus:
debug.append('Remove Lowest (unforced)')
which.eliminate()
return True
return False
def exclude_lowest(candidates):
### use: ahead = len(candidates) ??
ahead = 1000000000. # greater than any possible candidate.ahead
rand = 1.1 # greater than any possible candidate.rand
which = None
used_rand = False
random.shuffle(candidates)
for c in candidates:
if c.status == HOPEFUL or c.status == ALMOST:
if c.ahead < ahead:
ahead = c.ahead
rand = c.rand
which = c
use_rand = False
elif c.ahead == ahead:
use_rand = True
if c.rand < rand:
rand = c.rand
which = c
if use_rand:
debug.append('Random choice used!')
assert which
which.eliminate()
def generate_random(count, votes):
random.seed(len(votes)) # Seed based on number of votes
# Generate COUNT values in [0.0, 1.0)
values = [random.random() for x in range(count)]
# Ensure there are no duplicates
for value in values:
if values.count(value) > 1:
break
else:
# The loop finished without breaking out
return values
# STV Tally:
# Version 1 == old style abcdefg...
# Version 2 == new style AA,AB,AC,AD...
def tallySTV(votes, issue, version = 2):
rvotes = {}
ovotes = votes
# Cut out abstained votes.
for vote in votes:
if votes[vote].find("-") == -1:
if version == 1:
rvotes[vote] = votes[vote]
else:
rvotes[vote] = re.split(r"[\s,]", votes[vote])
votes = rvotes
m = re.match(r"stv(\d+)", issue['type'])
if not m:
raise Exception("Not an STV vote!")
numseats = int(m.group(1))
candidates = {}
z = 0
for c in issue['candidates']:
candidates[makeLetter(z, version)] = c['name']
z += 1
# run the stv calc
# Try version 2 first, fall back to version 1 if it breaks
winners = []
if version == 2:
try:
winners = run_vote(candidates, votes, numseats)
except:
return tallySTV(ovotes, issue, 1)
else:
winners = run_vote(candidates, votes, numseats)
winnernames = []
for c in winners:
winnernames.append(candidates[c])
# Return the data
return {
'votes': len(rvotes),
'winners': winners,
'winnernames': winnernames,
'debug': debug,
'version': version
}, """
Winners:
- %s
""" % "\n - ".join(winnernames)
constants.appendVote (
{
'key': "stv1",
'description': "Single Transferable Vote with 1 seat",
'category': 'stv',
'validate_func': validateSTV,
'vote_func': None,
'tally_func': tallySTV
},
)
# Add ad nauseam
for i in range(2,constants.MAX_NUM+1):
constants.appendVote (
{
'key': "stv%u" % i,
'description': "Single Transferable Vote with %u seats" % i,
'category': 'stv',
'validate_func': validateSTV,
'vote_func': None,
'tally_func': tallySTV
},
)