monitoring/stv_tool.py (302 lines of code) (raw):

#!/usr/bin/env python3 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # # The algorithms in this file are lifted from: # http://votesystem.cvs.sourceforge.net/viewvc/votesystem/votesystem/src/VoteMain.java?view=markup # ... there is possibly some room for a "more Pythonic" approach. The # conversion below was done in a straight-forward manner to ensure a # true and correct algorithm conversion. The debug output precisely # matches the output of VoteMain.java. # import sys import os.path import random import argparse import configparser import re import functools import json ELECTED = 1 HOPEFUL = 2 ELIMINATED = 4 ALMOST = 8 ERROR_MARGIN = 0.00001 BILLIONTH = 0.000000001 # "[" date "]" voterhash votes RE_VOTE = re.compile(r'\[.{19}\]\s+' r'(?P<voterhash>[\w\d]{32})\s+' r'(?P<votes>[a-z]{1,26})', re.I) VERBOSE = False #@deprecated def load_votes(fname): """Used by WHATIF.PY. Returns a list of names, and a list of vote-lists. """ line = open(fname).readline() if line.strip() == 'rank order': lines = open(fname).readlines() # The input file was processed by nstv-rank.py, or somehow otherwise # converted to the standard input for VoteMain.jar names = [s.strip() for s in lines[1].strip().split(',')][1:] labels = [s.strip() for s in lines[2].strip().split(',')][1:] assert len(names) == len(labels) remap = dict(zip(labels, names)) ### this is broken. We need to return a list of lists. Not a dict. ### we do not want to flow "who" voted -- that should be discarded. raise Exception("This script cannot parse the provided input file. " "Fix the script.") votes = { } for line in lines[3:]: parts = line.strip().split(',') votes[parts[0]] = [remap[l] for l in parts[1:]] return names, votes # Map from "a".."z" to human names. labelmap = read_nominees(fname) # Construct a label-sorted list of names from the labelmap. names = [name for _, name in sorted(labelmap.items())] # Load the raw votes that were recorded. (eg. "kbaf") votes_by_label = read_votefile(fname) # Remap all labels to names in the votes. votes = [ [labelmap[l] for l in vote] for vote in votes_by_label ] return names, votes def read_votefile(fname): """Return a list of votestrings, throwing out who produced each. Note: the file is time-ordered, and later votes override any prior vote from a specific voter. """ votes = { } for line in open(fname).readlines(): match = RE_VOTE.match(line) if match: # For a given voter hashcode, record their latest set of votes. votes[match.group('voterhash')] = match.group('votes') # Discard voterhash, and just return the list of votes. return votes.values() def read_jsonvotes(fname): "Return a list of votestrings." votes = [ ] for v in json.load(open(fname))['votes'].values(): # Ignore the "null" votes, for STV purposes. if (vote := v['vote']) != '-': votes.append(vote) return votes def read_nominees(votefile): """Return a label map for a given votefile. For VOTEFILE (raw_board_votes.txt or .json), this looks at the sibling file to map from vote label to a human name. """ ini_fname = os.path.join(os.path.dirname(votefile), 'board_nominations.ini') return read_labelmap(ini_fname) def read_labelmap(fname): if not os.path.exists(fname): print(f'ERROR: "{fname}" could not be found.', file=sys.stderr) sys.exit(2) config = configparser.ConfigParser() config.read(fname) try: return dict(config.items('nominees')) except: print(f'ERROR: could not process input file, "{fname}".', file=sys.stderr) sys.exit(2) def run_stv(names: list, ordered_votes: list, num_seats: int): # Note: ORDERED_VOTES is a list of lists. The interior lists are a list # of candidate (human) NAMES, as ordered by the voter. The outer list is # simply all the votes that were recorded. # NOTE: files use labels such as "a" or "f", but this STV algorithm # works with the human names. Cuz why not. # NOTE: NAMES must be a list for repeatability purposes. It does not # need to obey any particular ordering rules, but when re-running # tallies, NAMES must be presented with the same ordering. assert len(set(names)) == len(names), "duplicates present!" assert num_seats > 0 candidates = CandidateList(names) # name -> Candidate remap = dict((c.name, c) for c in candidates.l) # We can test that ordering of voters has no bearing. Perform runs # and alter the .seed() value. #ordered_votes = list(ordered_votes); random.seed(1); random.shuffle(ordered_votes); print('VOTE:', ordered_votes[0]) # VOTES is a list of ordered-lists of Candidate objects votes = [[remap[n] for n in choices] for choices in ordered_votes] if candidates.count(ELECTED + HOPEFUL) <= num_seats: dbg('All candidates elected') candidates.change_state(HOPEFUL, ELECTED) return candidates quota = None # not used on first pass iteration = 1 while candidates.count(ELECTED) < num_seats: dbg('Iteration %d', iteration) iteration += 1 quota = iterate_one(quota, votes, candidates, num_seats) candidates.reverse_random() dbg('All seats full') candidates.change_state(HOPEFUL, ELIMINATED) return candidates class CandidateList(object): def __init__(self, names): num_cand = len(names) randset = generate_random(num_cand) #dbg('RANDSET: %s', randset) self.l = [ ] for n, r in zip(names, randset): c = Candidate(n, r, num_cand-1) self.l.append(c) def count(self, state): return sum(int((c.status & state) != 0) for c in self.l) def change_state(self, from_state, to_state): any_changed = False for c in self.l: if (c.status & from_state) != 0: c.status = to_state if to_state == ELECTED: c.elect() elif to_state == ELIMINATED: c.eliminate() any_changed = True return any_changed def reverse_random(self): # Flip the values to create a different ordering among candidates. Note # that this will alternate the domain between [0.0, 1.0) and (0.0, 1.0] for c in self.l: c.rand = 1.0 - c.rand def adjust_weights(self, quota): for c in self.l: if c.status == ELECTED: c.adjust_weight(quota) def print_results(self): for c in self.l: print('%-40s%selected' % (c.name, c.status == ELECTED and ' ' or ' not ')) def dbg_display_tables(self, excess): total = excess for c in self.l: dbg('%-20s %15.9f %15.9f', c.name, c.weight, c.vote) total += c.vote dbg('%-20s %15s %15.9f', 'Non-transferable', ' ', excess) dbg('%-20s %15s %15.9f', 'Total', ' ', total) def calc_aheads(self): def compare(c1, c2): if c1.ahead < c2.ahead: return -1 if c1.ahead == c2.ahead: ### expression for removed cmp() function return (c1.vote > c2.vote) - (c1.vote < c2.vote) return 1 ### the algorithm below seems very obtuse, to produce very little ### change. It alters .ahead on the first iteration, then does ### minor tweaks afterwards. And it seems to only work pair-wise. ### It feels this could be simplified. ### refer to STV.java::calcAheads() c_sorted = sorted(self.l, key=functools.cmp_to_key(compare)) last = 0 for i in range(1, len(c_sorted)+1): if i == len(c_sorted) or c_sorted[last].vote != c_sorted[i].vote: for c in c_sorted[last:i]: c.ahead = (i - 1) + last last = i def apply_votes(self, votes): # Reset each candidates vote 0.0 for c in self.l: c.vote = 0.0 # Each voter has 1 vote. Due to candidate weighting, it might not # get fully-assigned to candidates. We need to remember this excess. excess = 0.0 # Now, process that 1 vote. for choices in votes: vote = 1.0 # Distribute the vote, according to their ordered wishes. for c in choices: if c.status == HOPEFUL: c.vote += vote vote = 0.0 break if c.status != ELIMINATED: wv = c.weight * vote # weighted vote c.vote += wv vote -= wv # Note: should probably test for floating point margins, but # it's fine to just let this spill into the EXCESS value. if vote == 0.0: # nothing left to distribute break excess += vote # Done. Tell caller what we could not distribute. return excess class Candidate(object): def __init__(self, name, rand, ahead): self.name = name self.rand = rand self.ahead = ahead self.status = HOPEFUL self.weight = 1.0 self.vote = None # calculated later def elect(self): self.status = ELECTED dbg('Elected: %s', self.name) def eliminate(self): self.status = ELIMINATED self.weight = 0.0 dbg('Eliminated: %s', self.name) def adjust_weight(self, quota): assert quota is not None self.weight = (self.weight * quota) / self.vote def iterate_one(quota, votes, candidates, num_seats): # assume that: count(ELECTED) < num_seats if candidates.count(ELECTED + HOPEFUL) <= num_seats: dbg('All remaining candidates elected') candidates.change_state(HOPEFUL, ELECTED) return None candidates.adjust_weights(quota) changed, new_quota, surplus = recalc(votes, candidates, num_seats) if not changed and surplus < ERROR_MARGIN: dbg('Remove Lowest (forced)') exclude_lowest(candidates.l) return new_quota def recalc(votes, candidates, num_seats): excess = candidates.apply_votes(votes) a1 = [c.ahead for c in candidates.l] candidates.calc_aheads() a2 = [c.ahead for c in candidates.l] if a1 != a2: pass #print(f'CHANGE:\n {a1}\n {a2}') candidates.dbg_display_tables(excess) quota = calc_quota(len(votes), excess, num_seats) any_changed = elect(quota, candidates, num_seats) surplus = calc_surplus(quota, candidates) any_changed |= try_remove_lowest(surplus, candidates) return any_changed, quota, surplus #@deprecated def calc_totals(votes, candidates): return candidates.apply_votes(votes) #@deprecated def calc_aheads(candidates): candidates.calc_aheads() def calc_quota(num_voters, excess, num_seats): if num_seats > 2: quota = (float(num_voters) - excess) / (float(num_seats + 1) + BILLIONTH) else: quota = (float(num_voters) - excess) / float(num_seats + 1) if quota < ERROR_MARGIN: raise Exception('Internal Error - very low quota') dbg('Quota = %.9f', quota) return quota def elect(quota, candidates, num_seats): for c in candidates.l: if c.status == HOPEFUL and c.vote >= quota: c.status = ALMOST any_changed = False while candidates.count(ELECTED + ALMOST) > num_seats: dbg('Vote tiebreaker! voters: %d seats: %d', candidates.count(ELECTED + ALMOST), num_seats) candidates.change_state(HOPEFUL, ELIMINATED) exclude_lowest(candidates.l) any_changed = True # we changed the candidates via exclude_lowest() any_changed |= candidates.change_state(ALMOST, ELECTED) return any_changed def calc_surplus(quota, candidates): surplus = sum(c.vote - quota for c in candidates.l if c.status == ELECTED) dbg('Total Surplus = %.9f', surplus) return surplus def try_remove_lowest(surplus, candidates): lowest1 = 1e18 lowest2 = 1e18 which = None for c in candidates.l: if c.status == HOPEFUL and c.vote < lowest1: lowest1 = c.vote which = c for c in candidates.l: if c.status != ELIMINATED and c.vote > lowest1 and c.vote < lowest2: lowest2 = c.vote diff = lowest2 - lowest1 if diff >= 0.0: dbg('Lowest Difference = %.9f - %.9f = %.9f', lowest2, lowest1, diff) if diff > surplus: dbg('Remove Lowest (unforced)') which.eliminate() return True return False def exclude_lowest(candidates): which = None use_rand = False for c in candidates: if c.status == HOPEFUL or c.status == ALMOST: if which is None or c.ahead < which.ahead: which = c use_rand = False elif c.ahead == which.ahead: use_rand = True if c.rand < which.rand: which = c if use_rand: dbg('Random choice used!') assert which which.eliminate() def generate_random(count): random.seed(0) ### choose a seed based on input? for now: repeatable. while True: # Generate COUNT values in [0.0, 1.0) # NOTE: use a list (not a set or dict) for repeatable ordering. values = [random.random() for x in range(count)] # Use a set() to check for dups. If no dups, then return the values. if len(set(values)) == count: return values def dbg(fmt, *args): if VERBOSE: print(fmt % args) def main(argv): parser = argparse.ArgumentParser(description="Calculate a winner for a vote") parser.add_argument('raw_file') parser.add_argument("-s", "--seats", dest="seats", type=int, help="Number of seats available, default 9", default=9) parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="Enable verbose logging", default=False) args = parser.parse_args(argv) global VERBOSE VERBOSE = args.verbose if not os.path.exists(args.raw_file): parser.print_help() sys.exit(1) # Get mapping from vote label (typically "a" to "z") to human name. labelmap = read_nominees(args.raw_file) # Construct a label-sorted list of names from the labelmap. names = [name for _, name in sorted(labelmap.items())] # Turn votes using labels into by-name. if args.raw_file.endswith('.json'): ### noted on 2025-03-06: ### this appears totally broken. The prior-year raw JSON files have ### labels such as "AK" and "AB", yet the labels extracted from ### board_nominations.ini uses labels like "k" and "b". ### QUESTION: do new .json files have a mapping in them? eg. who is "AK"? votes_by_label = read_jsonvotes(args.raw_file) votes = [[labelmap[l.lower()] for l in vote.split()] for vote in votes_by_label] else: votes_by_label = read_votefile(args.raw_file) votes = [[labelmap[l] for l in vote] for vote in votes_by_label] candidates = run_stv(names, votes, args.seats) candidates.print_results() print('Done!') if __name__ == '__main__': main(sys.argv[1:])