static int process_msg()

in xenbus/xenbus_comms.c [205:328]


static int process_msg(void)
{
	static struct {
		struct xsd_sockmsg msg;
		char *body;
		union {
			void *alloc;
			struct xs_watch_event *watch;
		};
		bool in_msg;
		bool in_hdr;
		unsigned int read;
	} state;
	struct xb_req_data *req;
	int err;
	unsigned int len;

	if (!state.in_msg) {
		state.in_msg = true;
		state.in_hdr = true;
		state.read = 0;

		/*
		 * We must disallow save/restore while reading a message.
		 * A partial read across s/r leaves us out of sync with
		 * xenstored.
		 * xs_response_mutex is locked as long as we are processing one
		 * message. state.in_msg will be true as long as we are holding
		 * the lock here.
		 */
		mutex_lock(&xs_response_mutex);

		if (!xb_data_to_read()) {
			/* We raced with save/restore: pending data 'gone'. */
			mutex_unlock(&xs_response_mutex);
			state.in_msg = false;
			return 0;
		}
	}

	if (state.in_hdr) {
		if (state.read != sizeof(state.msg)) {
			err = xb_read((void *)&state.msg + state.read,
				      sizeof(state.msg) - state.read);
			if (err < 0)
				goto out;
			state.read += err;
			if (state.read != sizeof(state.msg))
				return 0;
			if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
				err = -EINVAL;
				goto out;
			}
		}

		len = state.msg.len + 1;
		if (state.msg.type == XS_WATCH_EVENT)
			len += sizeof(*state.watch);

		state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);
		if (!state.alloc)
			return -ENOMEM;

		if (state.msg.type == XS_WATCH_EVENT)
			state.body = state.watch->body;
		else
			state.body = state.alloc;
		state.in_hdr = false;
		state.read = 0;
	}

	err = xb_read(state.body + state.read, state.msg.len - state.read);
	if (err < 0)
		goto out;

	state.read += err;
	if (state.read != state.msg.len)
		return 0;

	state.body[state.msg.len] = '\0';

	if (state.msg.type == XS_WATCH_EVENT) {
		state.watch->len = state.msg.len;
		err = xs_watch_msg(state.watch);
	} else {
		err = -ENOENT;
		mutex_lock(&xb_write_mutex);
		list_for_each_entry(req, &xs_reply_list, list) {
			if (req->msg.req_id == state.msg.req_id) {
				list_del(&req->list);
				err = 0;
				break;
			}
		}
		mutex_unlock(&xb_write_mutex);
		if (err)
			goto out;

		if (req->state == xb_req_state_wait_reply) {
			req->msg.req_id = req->caller_req_id;
			req->msg.type = state.msg.type;
			req->msg.len = state.msg.len;
			req->body = state.body;
			/* write body, then update state */
			virt_wmb();
			req->state = xb_req_state_got_reply;
			req->cb(req);
		} else
			kfree(req);
	}

	mutex_unlock(&xs_response_mutex);

	state.in_msg = false;
	state.alloc = NULL;
	return err;

 out:
	mutex_unlock(&xs_response_mutex);
	state.in_msg = false;
	kfree(state.alloc);
	state.alloc = NULL;
	return err;
}