pachi_py/pachi/distributed/merge.c (196 lines of code) (raw):
/* The master keeps stats received from slaves in a queue of received
* buffers that are merged together with the functions implemented
* here. It also has one hash table per slave to maintain cumulative
* stats that have not yet been sent to the slave machine. The queue
* and the hash tables are cleared at each new move. */
#include <assert.h>
#include <stdio.h>
#include <limits.h>
#define DEBUG
#include "debug.h"
#include "timeinfo.h"
#include "distributed/distributed.h"
#include "distributed/merge.h"
/* We merge together debug stats for all hash tables. */
static struct hash_counts h_counts;
/* Display and reset hash statistics. For debugging only. */
void
merge_print_stats(int total_hnodes)
{
if (DEBUGL(3)) {
char buf[BSIZE];
snprintf(buf, sizeof(buf),
"stats occupied %ld %.1f%% inserts %ld collisions %ld/%ld %.1f%%\n",
h_counts.occupied, h_counts.occupied * 100.0 / total_hnodes,
h_counts.inserts, h_counts.collisions, h_counts.lookups,
h_counts.collisions * 100.0 / (h_counts.lookups + 1));
logline(NULL, "* ", buf);
}
if (DEBUG_MODE) h_counts.occupied = 0;
}
/* We maintain counts per bucket to avoid sorting large arrays.
* All nodes with n updates since last send go to bucket n.
* We have at most max_merged_nodes = (max_slaves-1) * shared_nodes
* nodes to merge, 230K nodes for 24 slaves. If we put all nodes above
* 1K updates in the top bucket, we get at most 230 nodes in this
* bucket. So we can select exactly the best shared_nodes nodes if
* shared_nodes >= 230. In practice there is overlap between
* nodes sent by different slaves so shared_nodes can be lower. */
#define MAX_BUCKETS 1024
/* Update the hash table for the given increment stats,
* and increment the bucket count. Return the hash index.
* The slave lock is not held on either entry or exit of this function */
static inline int
stats_tally(struct incr_stats *s, struct slave_state *sstate, int *bucket_count)
{
int h;
bool found;
struct incr_stats *stats_htable = sstate->stats_htable;
find_hash(h, stats_htable, sstate->stats_hbits, s->coord_path, found, h_counts);
if (found) {
assert(stats_htable[h].incr.playouts > 0);
stats_add_result(&stats_htable[h].incr, s->incr.value, s->incr.playouts);
} else {
stats_htable[h] = *s;
if (DEBUG_MODE) h_counts.inserts++, h_counts.occupied++;
}
int incr = stats_htable[h].incr.playouts;
if (incr >= MAX_BUCKETS) incr = MAX_BUCKETS - 1;
bucket_count[incr]++;
return h;
}
static struct incr_stats terminator = { .coord_path = INT64_MAX };
/* Initialize the next pointers (see merge_new_stats()).
* Exclude invalid buffers and my own buffers by setting their next pointer
* to a terminator value. Update min if there are too many nodes to merge,
* so that merge time remains reasonable and the merge buffer doesn't overflow.
* (We skip the oldest buffers if the slave thread is too much behind. It is
* more important to get frequent incomplete updates than late complete updates.)
* Return the total number of nodes to be merged.
* The slave lock is not held on either entry or exit of this function. */
static int
filter_buffers(struct slave_state *sstate, struct incr_stats **next,
int *min, int max)
{
int size = 0;
int max_size = sstate->max_merged_nodes * sizeof(struct incr_stats);
for (int q = max; q >= *min; q--) {
if (!receive_queue[q] || receive_queue[q]->owner == sstate->thread_id) {
next[q] = &terminator;
} else if (size + receive_queue[q]->size > max_size) {
*min = q + 1;
assert(*min <= max);
break;
} else {
next[q] = (struct incr_stats *)receive_queue[q]->buf;
size += receive_queue[q]->size;
}
}
return size / sizeof(struct incr_stats);
}
/* Return the minimum coord path of next[min..max].
* This implementation is optimized for small values of max - min,
* which is the case if slaves are not too much behind.
* A heap (priority queue) could be used otherwise.
* The returned value might be come from a buffer that has
* been invalidated, the caller must check for this; in this
* case the returned value is < the correct value. */
static inline path_t
min_coord(struct incr_stats **next, int min, int max)
{
path_t min_c = next[min]->coord_path;
for (int q = min + 1; q <= max; q++) {
if (next[q]->coord_path < min_c)
min_c = next[q]->coord_path;
}
return min_c;
}
/* Merge all valid incremental stats in receive_queue[min..max],
* update the hash table, set the bucket counts, and save the
* list of updated hash table entries. The input buffers and
* the output buffer are all sorted by increasing coord path.
* The input buffers end with a terminator value INT64_MAX.
* Return the number of updated hash table entries. */
/* The slave lock is not held on either entry or exit of this function,
* so receive_queue entries may be invalidated while we scan them.
* The receive queue might grow while we scan it but we ignore
* entries above max, they will be processed at the next call.
* This function does not modify the receive queue. */
static int
merge_new_stats(struct slave_state *sstate, int min, int max,
int *bucket_count, int *nodes_read, int last_queue_age)
{
*nodes_read = 0;
if (max < min) return 0;
/* next[q] is the next value to be checked in receive_queue[q]->buf */
struct incr_stats *next_[max - min + 1];
struct incr_stats **next = next_ - min;
*nodes_read = filter_buffers(sstate, next, &min, max);
/* prev_min_c is only used for debugging. */
path_t prev_min_c = 0;
/* Do N-way merge, processing one coord path per iteration.
* If the minimum coord is INT64_MAX, either all buffers are
* invalidated, or at least one is valid and we are at the
* end of all valid buffers. In both cases we're done. */
int merge_count = 0;
path_t min_c;
while ((min_c = min_coord(next, min, max)) != INT64_MAX) {
struct incr_stats sum = { .coord_path = min_c,
.incr = { .playouts = 0, .value = 0.0 }};
for (int q = min; q <= max; q++) {
struct incr_stats s = *(next[q]);
/* If s.coord_path != min_c, we must skip s.coord_path for now.
* If min_c is invalid, a future iteration will get a stable
* value since the call of min_coord(), so at some point we will
* get s.coord_path == min_c and we will not loop forever. */
if (s.coord_path != min_c) continue;
/* We check the buffer validity after s.coord has been checked
* to avoid a race condition, and also to avoid multiple useless
* checks for the same coord_path. */
if (unlikely(!receive_queue[q])) {
next[q] = &terminator;
continue;
}
/* Stop if we have a new move. If queue_age is incremented
* after this check, the merged output will be discarded. */
if (unlikely(queue_age > last_queue_age)) return 0;
/* s.coord_path is valid here, so min_c is valid too.
* (An invalid min_c would be < s.coord_path.) */
assert(min_c > prev_min_c);
assert(s.coord_path && s.incr.playouts);
stats_add_result(&sum.incr, s.incr.value, s.incr.playouts);
next[q]++;
}
/* All the buffers containing min_c may have been invalidated
* so sum may still be zero. But in this case the next[q] which
* contained min_c have been reset to &terminator so we will
* not loop forever. */
if (!sum.incr.playouts) continue;
assert(min_c > prev_min_c);
if (DEBUG_MODE) prev_min_c = min_c;
/* At this point sum contains only valid increments,
* so we can add it to the hash table. */
assert(merge_count < sstate->max_merged_nodes);
sstate->merged[merge_count++] = stats_tally(&sum, sstate, bucket_count);
}
return merge_count;
}
/* Save in buf the best increments from other slaves merged previously.
* To avoid a costly scan of the entire hash table we only send nodes
* that were previously sent recently by other slaves. It is possible
* but very unlikely that the hash table contains some nodes with
* higher number of playouts.
* Return the number of nodes to be sent.
* The slave lock is not held on either entry or exit of this function. */
static int
output_stats(struct incr_stats *buf, struct slave_state *sstate,
int *bucket_count, int merge_count)
{
/* Find the minimum increment to send. The bucket with minimum
* increment may be sent only partially. */
int out_count = 0;
int min_incr = MAX_BUCKETS;
int shared_nodes = sstate->max_buf_size / sizeof(*buf);
do {
out_count += bucket_count[--min_incr];
} while (min_incr > 1 && out_count < shared_nodes);
/* Send all all increments > min_incr plus whatever we can at min_incr. */
int min_count = bucket_count[min_incr] - (out_count - shared_nodes);
out_count = 0;
int *merged = sstate->merged;
struct incr_stats *stats_htable = sstate->stats_htable;
while (merge_count--) {
int h = *merged++;
int delta = stats_htable[h].incr.playouts - min_incr;
if (delta < 0 || (delta == 0 && --min_count < 0)) continue;
assert (out_count < shared_nodes);
buf[out_count++] = stats_htable[h];
/* Clear the hash table entry. (We could instead
* just clear the playouts but clearing the entry
* leads to fewer collisions later.) */
stats_htable[h].coord_path = 0;
if (DEBUG_MODE) h_counts.occupied--;
}
/* The slave expects increments sorted by coord path
* but they are sorted already. */
return out_count;
}
/* Get all incremental stats received from other slaves since the
* last send. Store in buf the stats with largest playout increments.
* Return the byte size of the resulting buffer. The caller must
* check that the result is still valid.
* The slave lock is held on both entry and exit of this function. */
static int
get_new_stats(struct incr_stats *buf, struct slave_state *sstate, int cmd_id)
{
/* Process all valid buffers in receive_queue[min..max] */
int min = sstate->last_processed + 1;
int max = queue_length - 1;
if (max < min && cmd_id == sstate->stats_id) return 0;
sstate->last_processed = max;
int last_queue_age = queue_age;
/* It takes time to clear the hash table and merge the stats
* so do this unlocked. */
protocol_unlock();
double start = time_now();
double clear_time = 0;
/* Clear the hash table at a new move; the old paths in
* the hash table are now meaningless. */
if (cmd_id != sstate->stats_id) {
memset(sstate->stats_htable, 0,
(1 << sstate->stats_hbits) * sizeof(sstate->stats_htable[0]));
sstate->stats_id = cmd_id;
clear_time = time_now() - start;
}
/* Set the bucket counts and update the hash table stats. */
int bucket_count[MAX_BUCKETS];
memset(bucket_count, 0, sizeof(bucket_count));
int nodes_read;
int merge_count = merge_new_stats(sstate, min, max, bucket_count,
&nodes_read, last_queue_age);
int missed = 0;
if (DEBUG_MODE)
for (int q = min; q <= max; q++) missed += !receive_queue[q];
/* Put the best increments in the output buffer. */
int output_nodes = output_stats(buf, sstate, bucket_count, merge_count);
if (DEBUGVV(2)) {
char b[1024];
snprintf(b, sizeof(b), "merged %d..%d missed %d %d/%d nodes,"
" output %d/%d nodes in %.3fms (clear %.3fms)\n",
min, max, missed, merge_count, nodes_read, output_nodes,
sstate->max_buf_size / (int)sizeof(*buf),
(time_now() - start)*1000, clear_time*1000);
logline(&sstate->client, "= ", b);
}
protocol_lock();
return output_nodes * sizeof(*buf);
}
/* Allocate the buffers in the merge specific part of the slave sate,
* and reserve space for a terminator value (see merge_insert_hook). */
static void
merge_state_alloc(struct slave_state *sstate)
{
sstate->stats_htable = calloc2(1 << sstate->stats_hbits, sizeof(struct incr_stats));
sstate->merged = malloc2(sstate->max_merged_nodes * sizeof(int));
sstate->max_buf_size -= sizeof(struct incr_stats);
}
/* Append a terminator value to make merge_new_stats() more
* efficient. merge_state_alloc() has reserved enough space. */
static void
merge_insert_hook(struct incr_stats *buf, int size)
{
int nodes = size / sizeof(*buf);
buf[nodes].coord_path = INT64_MAX;
}
/* Initiliaze merge-related fields of the default slave state. */
void
merge_init(struct slave_state *sstate, int shared_nodes, int stats_hbits, int max_slaves)
{
/* See merge_state_alloc() for shared_nodes + 1 */
sstate->max_buf_size = (shared_nodes + 1) * sizeof(struct incr_stats);
sstate->stats_hbits = stats_hbits;
sstate->insert_hook = (buffer_hook)merge_insert_hook;
sstate->alloc_hook = merge_state_alloc;
sstate->args_hook = (getargs_hook)get_new_stats;
/* At worst one late slave thread may have to merge up to
* shared_nodes * BUFFERS_PER_SLAVE * (max_slaves - 1)
* nodes but on average it should not have to merge more than
* dist->shared_nodes * (max_slaves - 1)
* Restricting the maximum number of merged nodes to the latter avoids
* spending excessive time on the merge. */
sstate->max_merged_nodes = shared_nodes * (max_slaves - 1);
}