pachi_py/pachi/distributed/protocol.c (441 lines of code) (raw):
/* The functions implementing the master-slave protocol of the
* distributed engine are grouped here. They are independent
* of the gtp protocol. See the comments at the top of distributed.c
* for a general introduction to the distributed engine. */
/* The receive queue is an array of pointers to binary buffers.
* These pointers are invalidated in one of two ways when a buffer
* is recycled: (1) the queue age is increased when the queue is
* emptied at a new move, (2) the pointer itself is set to NULL
* immmediately, and stays so until at least the next queue age
* increment. */
#include <assert.h>
#include <stdio.h>
#include <pthread.h>
#include <ctype.h>
#include <unistd.h>
#define DEBUG
#include "random.h"
#include "timeinfo.h"
#include "playout.h"
#include "network.h"
#include "debug.h"
#include "distributed/distributed.h"
#include "distributed/protocol.h"
/* All gtp commands for current game separated by \n */
static char gtp_cmds[CMDS_SIZE];
/* Latest gtp command sent to slaves. */
static char *gtp_cmd = NULL;
/* Slaves send gtp_cmd when cmd_count changes. */
static int cmd_count = 0;
/* Remember at most 10 gtp ids per move: kgs-rules, boardsize, clear_board,
* time_settings, komi, handicap, genmoves, play pass, play pass, final_status_list */
#define MAX_CMDS_PER_MOVE 10
/* History of gtp commands sent for current game, indexed by move. */
static struct cmd_history {
int gtp_id;
char *next_cmd;
} history[MAX_GAMELEN][MAX_CMDS_PER_MOVE];
/* Number of active slave machines working for this master. */
int active_slaves = 0;
/* Number of replies to last gtp command already received. */
int reply_count = 0;
/* All replies to latest gtp command are in gtp_replies[0..reply_count-1]. */
char **gtp_replies;
struct buf_state **receive_queue;
int queue_length = 0;
int queue_age = 0;
static int queue_max_length;
/* Mutex protecting all variables above. receive_queue may be
* read without the lock but is only written with lock held. */
static pthread_mutex_t slave_lock = PTHREAD_MUTEX_INITIALIZER;
/* Condition signaled when a new gtp command is available. */
static pthread_cond_t cmd_cond = PTHREAD_COND_INITIALIZER;
/* Condition signaled when reply_count increases. */
static pthread_cond_t reply_cond = PTHREAD_COND_INITIALIZER;
/* Mutex protecting stderr. Must not be held at same time as slave_lock. */
static pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER;
/* Absolute time when this program was started.
* For debugging only. */
static double start_time;
/* Default slave state. */
struct slave_state default_sstate;
/* Get exclusive access to the threads and commands state. */
void
protocol_lock(void)
{
pthread_mutex_lock(&slave_lock);
}
/* Release exclusive access to the threads and commands state. */
void
protocol_unlock(void)
{
pthread_mutex_unlock(&slave_lock);
}
/* Write the time, client address, prefix, and string s to stderr atomically.
* s should end with a \n */
void
logline(struct in_addr *client, char *prefix, char *s)
{
double now = time_now();
char addr[INET_ADDRSTRLEN];
if (client) {
#ifdef _WIN32
strcpy(addr, inet_ntoa(*client));
#else
inet_ntop(AF_INET, client, addr, sizeof(addr));
#endif
} else {
addr[0] = '\0';
}
pthread_mutex_lock(&log_lock);
fprintf(stderr, "%s%15s %9.3f: %s", prefix, addr, now - start_time, s);
pthread_mutex_unlock(&log_lock);
}
/* Thread opening a connection on the given socket and copying input
* from there to stderr. */
static void * __attribute__((noreturn))
proxy_thread(void *arg)
{
int proxy_sock = (intptr_t)arg;
assert(proxy_sock >= 0);
for (;;) {
struct in_addr client;
int conn = open_server_connection(proxy_sock, &client);
FILE *f = fdopen(conn, "r");
char buf[BSIZE];
while (fgets(buf, BSIZE, f)) {
logline(&client, "< ", buf);
}
fclose(f);
}
pthread_exit(NULL);
}
/* Get a reply to one gtp command. Return the gtp command id,
* or -1 if error. reply must have at least CMDS_SIZE bytes.
* The ascii reply ends with an empty line; if the first line
* contains "@size", a binary reply of size bytes follows the
* empty line. @size is not standard gtp, it is only used
* internally by Pachi for the genmoves command; it must be the
* last parameter on the line.
* *bin_size is the maximum size upon entry, actual size on return.
* slave_lock is not held on either entry or exit of this function. */
static int
get_reply(FILE *f, struct in_addr client, char *reply, void *bin_reply, int *bin_size)
{
double start = time_now();
int reply_id = -1;
*reply = '\0';
if (!fgets(reply, CMDS_SIZE, f)) return -1;
/* Check for binary reply. */
char *s = strchr(reply, '@');
int size = 0;
if (s) size = atoi(s+1);
assert(size <= *bin_size);
*bin_size = size;
if (DEBUGV(s, 2))
logline(&client, "<<", reply);
if ((*reply == '=' || *reply == '?') && isdigit(reply[1]))
reply_id = atoi(reply+1);
/* Read the rest of the ascii reply */
char *line = reply + strlen(reply);
while (fgets(line, reply + CMDS_SIZE - line, f) && *line != '\n') {
if (DEBUGL(3))
logline(&client, "<<", line);
line += strlen(line);
}
if (*line != '\n') return -1;
/* Read the binary reply if any. */
int len;
while (size && (len = fread(bin_reply, 1, size, f)) > 0) {
bin_reply = (char *)bin_reply + len;
size -= len;
}
if (*bin_size && DEBUGVV(2)) {
char buf[1024];
snprintf(buf, sizeof(buf), "read reply %d+%d bytes in %.4fms\n",
(int)strlen(reply), *bin_size,
(time_now() - start)*1000);
logline(&client, "= ", buf);
}
return size ? -1 : reply_id;
}
/* Send the gtp command to_send and get a reply from the slave machine.
* Write the reply in buf which must have at least CMDS_SIZE bytes.
* If *bin_size > 0, send bin_buf after the gtp command.
* Return any binary reply in bin_buf and set its size in bin_size.
* bin_buf is private to the slave and need not be copied.
* Return the gtp command id, or -1 if error.
* slave_lock is held on both entry and exit of this function. */
static int
send_command(char *to_send, void *bin_buf, int *bin_size,
FILE *f, struct slave_state *sstate, char *buf)
{
assert(to_send && gtp_cmd && bin_buf && bin_size);
strncpy(buf, to_send, CMDS_SIZE);
bool resend = to_send != gtp_cmd;
pthread_mutex_unlock(&slave_lock);
if (DEBUGL(1) && resend)
logline(&sstate->client, "? ",
to_send == gtp_cmds ? "resend all\n" : "partial resend\n");
double start = time_now();
fputs(buf, f);
if (*bin_size)
fwrite(bin_buf, 1, *bin_size, f);
fflush(f);
if (DEBUGV(strchr(buf, '@'), 2)) {
double ms = (time_now() - start) * 1000.0;
if (!DEBUGL(3)) {
char *s = strchr(buf, '\n');
if (s) s[1] = '\0';
}
logline(&sstate->client, ">>", buf);
if (*bin_size) {
char b[1024];
snprintf(b, sizeof(b),
"sent cmd %d+%d bytes in %.4fms\n",
(int)strlen(buf), *bin_size, ms);
logline(&sstate->client, "= ", b);
}
}
/* Reuse the buffers for the reply. */
*bin_size = sstate->max_buf_size;
int reply_id = get_reply(f, sstate->client, buf, bin_buf, bin_size);
pthread_mutex_lock(&slave_lock);
return reply_id;
}
/* Return the command sent after that with the given gtp id,
* or gtp_cmds if the id wasn't used in this game. If a play command
* has overwritten a genmoves command, return the play command.
* slave_lock is held on both entry and exit of this function. */
static char *
next_command(int cmd_id)
{
if (cmd_id == -1) return gtp_cmds;
int last_id = atoi(gtp_cmd);
int reply_move = move_number(cmd_id);
if (reply_move > move_number(last_id)) return gtp_cmds;
int slot;
for (slot = 0; slot < MAX_CMDS_PER_MOVE; slot++) {
if (cmd_id == history[reply_move][slot].gtp_id) break;
}
if (slot == MAX_CMDS_PER_MOVE) return gtp_cmds;
char *next = history[reply_move][slot].next_cmd;
assert(next);
return next;
}
/* Allocate buffers for a slave thread. The state should have been
* initialized already as a copy of the default slave state.
* slave_lock is not held on either entry or exit of this function. */
static void
slave_state_alloc(struct slave_state *sstate)
{
for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
sstate->b[n].buf = malloc2(sstate->max_buf_size);
sstate->b[n].owner = sstate->thread_id;
}
if (sstate->alloc_hook) sstate->alloc_hook(sstate);
}
/* Get a free binary buffer, first invalidating it in the receive
* queue if necessary. In practice all buffers should be used
* before they are invalidated, if BUFFERS_PER_SLAVE is large enough.
* slave_lock is held on both entry and exit of this function. */
static void *
get_free_buf(struct slave_state *sstate)
{
int newest = (sstate->newest_buf + 1) & (BUFFERS_PER_SLAVE - 1);
sstate->newest_buf = newest;
void *buf = sstate->b[newest].buf;
if (DEBUGVV(7)) {
char b[1024];
snprintf(b, sizeof(b),
"get free %d index %d buf=%p age %d qlength %d\n", newest,
sstate->b[newest].queue_index, buf, queue_age, queue_length);
logline(&sstate->client, "? ", b);
}
int index = sstate->b[newest].queue_index;
if (index < 0) return buf;
/* Invalidate the buffer if the calling thread still owns its previous
* entry in the receive queue. The entry may have been overwritten by
* another thread, but only after a new move which invalidates the
* entire receive queue. */
if (receive_queue[index] && receive_queue[index]->owner == sstate->thread_id) {
receive_queue[index] = NULL;
}
sstate->b[newest].queue_index = -1;
return buf;
}
/* Insert a buffer in the receive queue. It should be the most
* recent buffer allocated by the calling thread.
* slave_lock is held on both entry and exit of this function. */
static void
insert_buf(struct slave_state *sstate, void *buf, int size)
{
assert(queue_length < queue_max_length);
int newest = sstate->newest_buf;
assert(buf == sstate->b[newest].buf);
/* Update the buffer if necessary before making it
* available to other threads. */
if (sstate->insert_hook) sstate->insert_hook(buf, size);
if (DEBUGVV(7)) {
char b[1024];
snprintf(b, sizeof(b),
"insert newest %d age %d rq[%d]->%p owner %d\n",
newest, queue_age, queue_length, buf, sstate->thread_id);
logline(&sstate->client, "? ", b);
}
receive_queue[queue_length] = &sstate->b[newest];
receive_queue[queue_length]->size = size;
receive_queue[queue_length]->queue_index = queue_length;
queue_length++;
}
/* Clear the receive queue. The buffer pointers do not have to be cleared
* here, this is done as each buffer is recycled.
* slave_lock is held on both entry and exit of this function. */
void
clear_receive_queue(void)
{
if (DEBUGL(3)) {
char buf[1024];
snprintf(buf, sizeof(buf), "clear queue, old length %d age %d\n",
queue_length, queue_age);
logline(NULL, "? ", buf);
}
queue_length = 0;
queue_age++;
}
/* Process the reply received from a slave machine.
* Copy the ascii part to reply_buf and insert the binary part
* (if any) in the receive queue.
* Return false if ok, true if the slave is out of sync.
* slave_lock is held on both entry and exit of this function. */
static bool
process_reply(int reply_id, char *reply, char *reply_buf,
void *bin_reply, int bin_size, int *last_reply_id,
int *reply_slot, struct slave_state *sstate)
{
/* Resend everything if slave returned an error. */
/* FIXME: this often results in infinite loops on errors
* not caused by syncing. These should be reported from
* the distributed engine. */
if (*reply != '=') {
*last_reply_id = -1;
return true;
}
/* Make sure we are still in sync. cmd_count may have
* changed but the reply is valid as long as cmd_id didn't
* change (this only occurs for consecutive genmoves). */
int cmd_id = atoi(gtp_cmd);
if (reply_id != cmd_id) {
*last_reply_id = reply_id;
return true;
}
strncpy(reply_buf, reply, CMDS_SIZE);
if (reply_id != *last_reply_id)
*reply_slot = reply_count++;
gtp_replies[*reply_slot] = reply_buf;
if (bin_size) insert_buf(sstate, bin_reply, bin_size);
pthread_cond_signal(&reply_cond);
*last_reply_id = reply_id;
return false;
}
/* Get the binary arg for the given command, and update the command
* if necessary. For now, only genmoves has a binary argument, and
* we return the best stats increments from all other slaves.
* Set *bin_size to 0 if the command doesn't take binary arguments,
* but still return a buffer, to be used for the reply.
* Return NULL if the binary arg is obsolete by the time we have
* finished computing it, because a new command is available.
* This version only gets the buffer for the reply, to be completed
* in future commits.
* slave_lock is held on both entry and exit of this function. */
void *
get_binary_arg(struct slave_state *sstate, char *cmd, int cmd_size, int *bin_size)
{
int cmd_id = atoi(gtp_cmd);
void *buf = get_free_buf(sstate);
*bin_size = 0;
char *s = strchr(cmd, '@');
if (!s || !sstate->args_hook) return buf;
int size = sstate->args_hook(buf, sstate, cmd_id);
/* Check that the command is still valid. */
if (atoi(gtp_cmd) != cmd_id) return NULL;
/* Set the correct binary size for this slave.
* cmd may have been overwritten with new parameters. */
*bin_size = size;
s = strchr(cmd, '@');
assert(s);
snprintf(s, cmd + cmd_size - s, "@%d\n", size);
return buf;
}
/* Main loop of a slave thread.
* Send the current command to the slave machine and wait for a reply.
* Resend command history if the slave machine is out of sync.
* Returns when the connection with the slave machine is cut.
* slave_lock is held on both entry and exit of this function. */
static void
slave_loop(FILE *f, char *reply_buf, struct slave_state *sstate, bool resend)
{
char *to_send;
int last_cmd_count = 0;
int last_reply_id = -1;
int reply_slot = -1;
for (;;) {
if (resend) {
/* Resend complete or partial history */
to_send = next_command(last_reply_id);
} else {
/* Wait for a new command. */
while (last_cmd_count == cmd_count)
pthread_cond_wait(&cmd_cond, &slave_lock);
to_send = gtp_cmd;
}
/* Command available, send it to slave machine.
* If slave was out of sync, send the history.
* But first get binary arguments if necessary. */
int bin_size = 0;
void *bin_buf = get_binary_arg(sstate, gtp_cmd,
gtp_cmds + CMDS_SIZE - gtp_cmd,
&bin_size);
/* Check that the command is still valid. */
resend = true;
if (!bin_buf) continue;
/* Send the command and get the reply, which always ends with \n\n
* The slave machine sends "=id reply" or "?id reply"
* with id == cmd_id if it is in sync. */
last_cmd_count = cmd_count;
char buf[CMDS_SIZE];
int reply_id = send_command(to_send, bin_buf, &bin_size, f,
sstate, buf);
if (reply_id == -1) return;
resend = process_reply(reply_id, buf, reply_buf, bin_buf, bin_size,
&last_reply_id, &reply_slot, sstate);
}
}
/* Minimimal check of slave identity. Close the file if error. */
static bool
is_pachi_slave(FILE *f, struct in_addr *client)
{
char buf[1024];
fputs("name\n", f);
if (!fgets(buf, sizeof(buf), f)
|| strncasecmp(buf, "= Pachi", 7)
|| !fgets(buf, sizeof(buf), f)
|| strcmp(buf, "\n")) {
logline(client, "? ", "bad slave\n");
fclose(f);
sleep(1); // avoid busy loop if error
return false;
}
return true;
}
/* Thread sending gtp commands to one slave machine, and
* reading replies. If a slave machine dies, this thread waits
* for a connection from another slave.
* The large buffers are allocated only once we get a first
* connection, to avoid wasting memory if max_slaves is too large.
* We do not invalidate the received buffers if a slave disconnects;
* they are still useful for other slaves. */
static void * __attribute__((noreturn))
slave_thread(void *arg)
{
struct slave_state sstate = default_sstate;
sstate.thread_id = (intptr_t)arg;
assert(sstate.slave_sock >= 0);
char reply_buf[CMDS_SIZE];
bool resend = false;
for (;;) {
/* Wait for a connection from any slave. */
struct in_addr client;
int conn = open_server_connection(sstate.slave_sock, &client);
FILE *f = fdopen(conn, "r+");
if (DEBUGL(2)) {
snprintf(reply_buf, sizeof(reply_buf),
"new slave, id %d\n", sstate.thread_id);
logline(&client, "= ", reply_buf);
}
if (!is_pachi_slave(f, &client)) continue;
if (!resend) slave_state_alloc(&sstate);
sstate.client = client;
pthread_mutex_lock(&slave_lock);
active_slaves++;
slave_loop(f, reply_buf, &sstate, resend);
assert(active_slaves > 0);
active_slaves--;
// Unblock main thread if it was waiting for this slave.
pthread_cond_signal(&reply_cond);
pthread_mutex_unlock(&slave_lock);
resend = true;
if (DEBUGL(2))
logline(&client, "= ", "lost slave\n");
fclose(f);
}
pthread_exit(NULL);
}
/* Create a new gtp command for all slaves. The slave lock is held
* upon entry and upon return, so the command will actually be
* sent when the lock is released. The last command is overwritten
* if gtp_cmd points to a non-empty string. cmd is a single word;
* args has all arguments and is empty or has a trailing \n */
void
update_cmd(struct board *b, char *cmd, char *args, bool new_id)
{
assert(gtp_cmd);
/* To make sure the slaves are in sync, we ignore the original id
* and use the board number plus some random bits as gtp id. */
static int gtp_id = -1;
int moves = is_reset(cmd) ? 0 : b->moves;
if (new_id) {
int prev_id = gtp_id;
do {
/* fast_random() is 16-bit only so the multiplication can't overflow. */
gtp_id = force_reply(moves + fast_random(65535) * DIST_GAMELEN);
} while (gtp_id == prev_id);
reply_count = 0;
}
snprintf(gtp_cmd, gtp_cmds + CMDS_SIZE - gtp_cmd, "%d %s %s",
gtp_id, cmd, *args ? args : "\n");
cmd_count++;
/* Remember history for out-of-sync slaves. */
static int slot = 0;
static struct cmd_history *last = NULL;
if (new_id) {
if (last) last->next_cmd = gtp_cmd;
slot = (slot + 1) % MAX_CMDS_PER_MOVE;
last = &history[moves][slot];
last->gtp_id = gtp_id;
last->next_cmd = NULL;
}
// Notify the slave threads about the new command.
pthread_cond_broadcast(&cmd_cond);
}
/* Update the command history, then create a new gtp command
* for all slaves. The slave lock is held upon entry and
* upon return, so the command will actually be sent when the
* lock is released. cmd is a single word; args has all
* arguments and is empty or has a trailing \n */
void
new_cmd(struct board *b, char *cmd, char *args)
{
// Clear the history when a new game starts:
if (!gtp_cmd || is_gamestart(cmd)) {
gtp_cmd = gtp_cmds;
memset(history, 0, sizeof(history));
} else {
/* Preserve command history for new slaves.
* To indicate that the slave should only reply to
* the last command we force the id of previous
* commands to be just the move number. */
int id = prevent_reply(atoi(gtp_cmd));
int len = strspn(gtp_cmd, "0123456789");
char buf[32];
snprintf(buf, sizeof(buf), "%0*d", len, id);
memcpy(gtp_cmd, buf, len);
gtp_cmd += strlen(gtp_cmd);
}
// Let the slave threads send the new gtp command:
update_cmd(b, cmd, args, true);
}
/* Wait for at least one new reply. Return when at least
* min_replies slaves have already replied, or when the
* given absolute time is passed.
* The replies are returned in gtp_replies[0..reply_count-1]
* slave_lock is held on entry and on return. */
void
get_replies(double time_limit, int min_replies)
{
for (;;) {
if (reply_count > 0) {
struct timespec ts;
double sec;
ts.tv_nsec = (int)(modf(time_limit, &sec)*1000000000.0);
ts.tv_sec = (int)sec;
pthread_cond_timedwait(&reply_cond, &slave_lock, &ts);
} else {
pthread_cond_wait(&reply_cond, &slave_lock);
}
if (reply_count == 0) continue;
if (reply_count >= min_replies || reply_count >= active_slaves) return;
if (time_now() >= time_limit) break;
}
if (DEBUGL(1)) {
char buf[1024];
snprintf(buf, sizeof(buf),
"get_replies timeout %.3f >= %.3f, replies %d < min %d, active %d\n",
time_now() - start_time, time_limit - start_time,
reply_count, min_replies, active_slaves);
logline(NULL, "? ", buf);
}
assert(reply_count > 0);
}
/* In a 5mn move with at least 5ms per genmoves we get at most
* 300*200=60000 genmoves per slave. */
#define MAX_GENMOVES_PER_SLAVE 60000
/* Allocate the receive queue, and create the slave and proxy threads.
* max_buf_size and the merge-related fields of default_sstate must
* already be initialized. */
void
protocol_init(char *slave_port, char *proxy_port, int max_slaves)
{
start_time = time_now();
queue_max_length = max_slaves * MAX_GENMOVES_PER_SLAVE;
receive_queue = calloc2(queue_max_length, sizeof(*receive_queue));
default_sstate.slave_sock = port_listen(slave_port, max_slaves);
default_sstate.last_processed = -1;
for (int n = 0; n < BUFFERS_PER_SLAVE; n++) {
default_sstate.b[n].queue_index = -1;
}
pthread_t thread;
for (int id = 0; id < max_slaves; id++) {
pthread_create(&thread, NULL, slave_thread, (void *)(intptr_t)id);
}
if (proxy_port) {
int proxy_sock = port_listen(proxy_port, max_slaves);
for (int id = 0; id < max_slaves; id++) {
pthread_create(&thread, NULL, proxy_thread, (void *)(intptr_t)proxy_sock);
}
}
}