static int pvcalls_back_release_active()

in pvcalls-back.c [87:165]


static int pvcalls_back_release_active(struct xenbus_device *dev,
				       struct pvcalls_fedata *fedata,
				       struct sock_mapping *map);

static bool pvcalls_conn_back_read(void *opaque)
{
	struct sock_mapping *map = (struct sock_mapping *)opaque;
	struct msghdr msg;
	struct kvec vec[2];
	RING_IDX cons, prod, size, wanted, array_size, masked_prod, masked_cons;
	int32_t error;
	struct pvcalls_data_intf *intf = map->ring;
	struct pvcalls_data *data = &map->data;
	unsigned long flags;
	int ret;

	array_size = XEN_FLEX_RING_SIZE(map->ring_order);
	cons = intf->in_cons;
	prod = intf->in_prod;
	error = intf->in_error;
	/* read the indexes first, then deal with the data */
	virt_mb();

	if (error)
		return false;

	size = pvcalls_queued(prod, cons, array_size);
	if (size >= array_size)
		return false;
	spin_lock_irqsave(&map->sock->sk->sk_receive_queue.lock, flags);
	if (skb_queue_empty(&map->sock->sk->sk_receive_queue)) {
		atomic_set(&map->read, 0);
		spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock,
				flags);
		return true;
	}
	spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock, flags);
	wanted = array_size - size;
	masked_prod = pvcalls_mask(prod, array_size);
	masked_cons = pvcalls_mask(cons, array_size);

	memset(&msg, 0, sizeof(msg));
	if (masked_prod < masked_cons) {
		vec[0].iov_base = data->in + masked_prod;
		vec[0].iov_len = wanted;
		iov_iter_kvec(&msg.msg_iter, WRITE, vec, 1, wanted);
	} else {
		vec[0].iov_base = data->in + masked_prod;
		vec[0].iov_len = array_size - masked_prod;
		vec[1].iov_base = data->in;
		vec[1].iov_len = wanted - vec[0].iov_len;
		iov_iter_kvec(&msg.msg_iter, WRITE, vec, 2, wanted);
	}

	atomic_set(&map->read, 0);
	ret = inet_recvmsg(map->sock, &msg, wanted, MSG_DONTWAIT);
	WARN_ON(ret > wanted);
	if (ret == -EAGAIN) /* shouldn't happen */
		return true;
	if (!ret)
		ret = -ENOTCONN;
	spin_lock_irqsave(&map->sock->sk->sk_receive_queue.lock, flags);
	if (ret > 0 && !skb_queue_empty(&map->sock->sk->sk_receive_queue))
		atomic_inc(&map->read);
	spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock, flags);

	/* write the data, then modify the indexes */
	virt_wmb();
	if (ret < 0) {
		atomic_set(&map->read, 0);
		intf->in_error = ret;
	} else
		intf->in_prod = prod + ret;
	/* update the indexes, then notify the other end */
	virt_wmb();
	notify_remote_via_irq(map->irq);

	return true;
}