in erts/etc/common/inet_gethost.c [701:1012]
static void main_loop(void)
{
AddrByte *inbuff = NULL;
int insize;
int i,w;
#ifdef WIN32
HANDLE handles[64];
DWORD num_handles;
DWORD index;
QueItem *qi;
#else
size_t inbuff_size = 0;
fd_set fds;
int max_fd;
#endif
int new_data;
int save_serial;
/* It's important that the free workers list is handled first */
Worker *workers[3] = {free_workers, busy_workers, stalled_workers};
int *wsizes[3] = {&num_free_workers, &num_busy_workers,
&num_stalled_workers};
int (*handlers[3])(int) = {&handle_io_free, &handle_io_busy,
&handle_io_stalled};
Worker *cw;
AddrByte domainbuff[DOMAINNAME_MAX];
#ifdef WIN32
{
DWORD dummy;
/* Create the reader and writer */
if ((!create_mesq(&to_erlang)) || (!create_mesq(&from_erlang))) {
fatal("Could not create message que! errno = %d.",GetLastError());
}
if (((HANDLE) _beginthreadex(NULL,0,writer,to_erlang,0,&dummy))
== NULL) {
fatal("Could not create writer thread! errno = %d.",GetLastError());
}
if (((HANDLE) _beginthreadex(NULL,0,reader,from_erlang,0,&dummy))
== NULL) {
fatal("Could not create reader thread! errno = %d.",GetLastError());
}
DEBUGF(4,("Created reader and writer threads."));
#ifdef HARDDEBUG
poll_gethost(__LINE__);
#endif
}
#endif
for(;;) {
#ifdef WIN32
num_handles = 0;
handles[num_handles++] = event_mesq(from_erlang);
for (w = 0; w < 3; ++w) {
for (i = 0; i < *wsizes[w]; ++i) {
handles[num_handles++] = event_mesq(workers[w][i].readfrom);
}
}
if ((index = WaitForMultipleObjects(num_handles, handles, FALSE, INFINITE))
== WAIT_FAILED) {
fatal("Could not WaitForMultpleObjects! errno = %d.",GetLastError());
}
w = 0;
index -= WAIT_OBJECT_0;
DEBUGF(4,("Got data on index %d.",index));
if (index > 0) {
if (((int)index - 1) < *wsizes[0]) {
(*handlers[0])(index - 1);
} else if (((int)index - 1) < ((*wsizes[0]) + (*wsizes[1]))) {
(*handlers[1])(index - 1 - (*wsizes[0]));
} else {
(*handlers[2])(index - 1 - (*wsizes[0]) - (*wsizes[1]));
}
}
new_data = (index == 0);
#else
max_fd = 0;
FD_ZERO(&fds);
FD_SET(0,&fds);
for (w = 0; w < 3; ++w) {
for (i = 0; i < *wsizes[w]; ++i) {
FD_SET(workers[w][i].readfrom,&fds);
if (workers[w][i].readfrom > max_fd) {
max_fd = workers[w][i].readfrom;
}
}
}
for (;;) {
if (select(max_fd + 1,&fds,NULL,NULL,NULL) < 0) {
if (errno == EINTR) {
continue;
} else {
fatal("Select failed (invalid internal structures?), "
"errno = %d.",errno);
}
}
break;
}
for (w = 0; w < 3; ++w) {
for (i = 0; i < *wsizes[w]; ++i) {
if (FD_ISSET(workers[w][i].readfrom, &fds)) {
int hres = (*handlers[w])(i);
if (hres < 0) {
return;
} else {
i -= hres; /* We'll retry this position, if hres == 1.
The position is usually
replaced with another worker,
a worker with
I/O usually changes state as we
use blocking file I/O */
}
}
}
}
new_data = FD_ISSET(0,&fds);
#endif
check_que();
/* Now check for new requests... */
if (new_data) { /* Erlang... */
OpType op;
#ifdef WIN32
if (!deque_mesq(from_erlang,&qi)) {
DEBUGF(1,("Erlang has closed."));
return;
}
insize = qi->req_size;
inbuff = qi->request;
DEBUGF(4,("Got data from erlang."));
DEBUGF(4,("OPeration == %d.",get_op(inbuff)));
#else
insize = read_request(&inbuff, &inbuff_size);
if (insize == 0) { /* Other errors taken care of in
read_request */
DEBUGF(1,("Erlang has closed."));
return;
}
#endif
op = get_op(inbuff);
if (op == OP_CANCEL_REQUEST) {
SerialType serial = get_serial(inbuff);
if (!clean_que_of(serial)) {
for (i = 0; i < num_busy_workers; ++i) {
if (busy_workers[i].serial == serial) {
if (busy_workers[i].que_size) {
restart_worker(&busy_workers[i]);
start_que_request(&busy_workers[i]);
} else {
stall_worker(i);
check_que();
}
break;
}
}
}
#ifdef WIN32
FREE(qi);
#endif
continue; /* New select */
} else if (op == OP_CONTROL) {
CtlType ctl;
SerialType serial = get_serial(inbuff);
if (serial != INVALID_SERIAL) {
fatal("Invalid serial: %d.", serial);
}
switch (ctl = get_ctl(inbuff)) {
case SETOPT_DEBUG_LEVEL:
{
int tmp_debug_level = get_debug_level(inbuff);
#ifdef WIN32
if (debug_console_allocated == INVALID_HANDLE_VALUE &&
tmp_debug_level > 0) {
DWORD res;
do_allocate_console();
WriteFile(debug_console_allocated,
"Hej\n",4,&res,NULL);
}
#endif
debug_level = tmp_debug_level;
DEBUGF(debug_level, ("debug_level = %d", debug_level));
for (w = 0; w < 3; ++w) {
for (i = 0; i < *wsizes[w]; i++) {
int res;
#ifdef WIN32
QueItem *m;
#endif
cw = &(workers[w][i]);
#ifdef WIN32
m = ALLOC(sizeof(QueItem) - 1 + qi->req_size);
memcpy(m->request, qi->request,
(m->req_size = qi->req_size));
m->next = NULL;
if ((res = send_mes_to_worker(m, cw)) != 0) {
FREE(m);
}
#else
res = send_request_to_worker(inbuff, insize, cw);
#endif
if (res != 0) {
kill_worker(cw);
(*wsizes[w])--;
*cw = workers[w][*wsizes[w]];
}
}
}
}
break;
default:
warning("Unknown control requested from erlang (%d), "
"message discarded.", (int) ctl);
break;
}
#ifdef WIN32
FREE(qi);
#endif
continue; /* New select */
} else {
ProtoType proto;
if (op != OP_GETHOSTBYNAME && op != OP_GETHOSTBYADDR) {
warning("Unknown operation requested from erlang (%d), "
"message discarded.", op);
#ifdef WIN32
FREE(qi);
#endif
continue;
}
if ((proto = get_proto(inbuff)) != PROTO_IPV4 &&
proto != PROTO_IPV6) {
warning("Unknown protocol requested from erlang (%d), "
"message discarded.", proto);
#ifdef WIN32
FREE(qi);
#endif
continue;
}
if (get_domainname(inbuff,insize,domainbuff) < 0) {
warning("Malformed message sent from erlang, no domain, "
"message discarded.", op);
#ifdef WIN32
FREE(qi);
#endif
continue;
}
}
if (BEE_GREEDY()) {
DEBUGF(4,("Beeing greedy!"));
if ((cw = pick_worker_greedy(domainbuff)) != NULL) {
/* Put it in the worker specific que if the
domainname matches... */
#ifndef WIN32
QueItem *qi = ALLOC(sizeof(QueItem) - 1 +
insize);
qi->req_size = insize;
memcpy(&(qi->request), inbuff, insize);
qi->next = NULL;
#endif
if (!cw->que_first) {
cw->que_first = cw->que_last = qi;
} else {
cw->que_last->next = qi;
cw->que_last = qi;
}
++(cw->que_size);
continue;
}
/* Otherwise busyness as usual */
}
save_serial = get_serial(inbuff);
while ((cw = pick_worker()) != NULL) {
int res;
#ifdef WIN32
res = send_mes_to_worker(qi,cw);
#else
res = send_request_to_worker(inbuff, insize, cw);
#endif
if (res == 0) {
break;
} else {
kill_last_picked_worker();
}
}
if (cw == NULL) {
/* Insert into que */
#ifndef WIN32
QueItem *qi = ALLOC(sizeof(QueItem) - 1 +
insize);
qi->req_size = insize;
memcpy(&(qi->request), inbuff, insize);
qi->next = NULL;
#endif
if (!que_first) {
que_first = que_last = qi;
} else {
que_last->next = qi;
que_last = qi;
}
} else {
cw->serial = save_serial;
domaincopy(cw->domain, domainbuff);
}
}
}
}