in xenbus/xenbus_comms.c [205:328]
static int process_msg(void)
{
static struct {
struct xsd_sockmsg msg;
char *body;
union {
void *alloc;
struct xs_watch_event *watch;
};
bool in_msg;
bool in_hdr;
unsigned int read;
} state;
struct xb_req_data *req;
int err;
unsigned int len;
if (!state.in_msg) {
state.in_msg = true;
state.in_hdr = true;
state.read = 0;
/*
* We must disallow save/restore while reading a message.
* A partial read across s/r leaves us out of sync with
* xenstored.
* xs_response_mutex is locked as long as we are processing one
* message. state.in_msg will be true as long as we are holding
* the lock here.
*/
mutex_lock(&xs_response_mutex);
if (!xb_data_to_read()) {
/* We raced with save/restore: pending data 'gone'. */
mutex_unlock(&xs_response_mutex);
state.in_msg = false;
return 0;
}
}
if (state.in_hdr) {
if (state.read != sizeof(state.msg)) {
err = xb_read((void *)&state.msg + state.read,
sizeof(state.msg) - state.read);
if (err < 0)
goto out;
state.read += err;
if (state.read != sizeof(state.msg))
return 0;
if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
err = -EINVAL;
goto out;
}
}
len = state.msg.len + 1;
if (state.msg.type == XS_WATCH_EVENT)
len += sizeof(*state.watch);
state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);
if (!state.alloc)
return -ENOMEM;
if (state.msg.type == XS_WATCH_EVENT)
state.body = state.watch->body;
else
state.body = state.alloc;
state.in_hdr = false;
state.read = 0;
}
err = xb_read(state.body + state.read, state.msg.len - state.read);
if (err < 0)
goto out;
state.read += err;
if (state.read != state.msg.len)
return 0;
state.body[state.msg.len] = '\0';
if (state.msg.type == XS_WATCH_EVENT) {
state.watch->len = state.msg.len;
err = xs_watch_msg(state.watch);
} else {
err = -ENOENT;
mutex_lock(&xb_write_mutex);
list_for_each_entry(req, &xs_reply_list, list) {
if (req->msg.req_id == state.msg.req_id) {
list_del(&req->list);
err = 0;
break;
}
}
mutex_unlock(&xb_write_mutex);
if (err)
goto out;
if (req->state == xb_req_state_wait_reply) {
req->msg.req_id = req->caller_req_id;
req->msg.type = state.msg.type;
req->msg.len = state.msg.len;
req->body = state.body;
/* write body, then update state */
virt_wmb();
req->state = xb_req_state_got_reply;
req->cb(req);
} else
kfree(req);
}
mutex_unlock(&xs_response_mutex);
state.in_msg = false;
state.alloc = NULL;
return err;
out:
mutex_unlock(&xs_response_mutex);
state.in_msg = false;
kfree(state.alloc);
state.alloc = NULL;
return err;
}