static int receive_Data()

in drbd/drbd_receiver.c [2586:2754]


static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
{
	struct drbd_peer_device *peer_device;
	struct drbd_device *device;
	struct net_conf *nc;
	sector_t sector;
	struct drbd_peer_request *peer_req;
	struct p_data *p = pi->data;
	u32 peer_seq = be32_to_cpu(p->seq_num);
	int op, op_flags;
	u32 dp_flags;
	int err, tp;

	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
		return -EIO;
	device = peer_device->device;

	if (!get_ldev(device)) {
		int err2;

		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
		atomic_inc(&connection->current_epoch->epoch_size);
		err2 = drbd_drain_block(peer_device, pi->size);
		if (!err)
			err = err2;
		return err;
	}

	/*
	 * Corresponding put_ldev done either below (on various errors), or in
	 * drbd_peer_request_endio, if we successfully submit the data at the
	 * end of this function.
	 */

	sector = be64_to_cpu(p->sector);
	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
	if (!peer_req) {
		put_ldev(device);
		return -EIO;
	}

	peer_req->w.cb = e_end_block;
	peer_req->submit_jif = jiffies;
	peer_req->flags |= EE_APPLICATION;

	dp_flags = be32_to_cpu(p->dp_flags);
	op = wire_flags_to_bio_op(dp_flags);
	op_flags = wire_flags_to_bio_flags(dp_flags);
	if (pi->cmd == P_TRIM) {
		D_ASSERT(peer_device, peer_req->i.size > 0);
		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
		D_ASSERT(peer_device, peer_req->pages == NULL);
		/* need to play safe: an older DRBD sender
		 * may mean zero-out while sending P_TRIM. */
		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
			peer_req->flags |= EE_ZEROOUT;
	} else if (pi->cmd == P_ZEROES) {
		D_ASSERT(peer_device, peer_req->i.size > 0);
		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
		D_ASSERT(peer_device, peer_req->pages == NULL);
		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
		if (dp_flags & DP_DISCARD)
			peer_req->flags |= EE_TRIM;
	} else if (peer_req->pages == NULL) {
		D_ASSERT(device, peer_req->i.size == 0);
		D_ASSERT(device, dp_flags & DP_FLUSH);
	}

	if (dp_flags & DP_MAY_SET_IN_SYNC)
		peer_req->flags |= EE_MAY_SET_IN_SYNC;

	spin_lock(&connection->epoch_lock);
	peer_req->epoch = connection->current_epoch;
	atomic_inc(&peer_req->epoch->epoch_size);
	atomic_inc(&peer_req->epoch->active);
	spin_unlock(&connection->epoch_lock);

	rcu_read_lock();
	nc = rcu_dereference(peer_device->connection->net_conf);
	tp = nc->two_primaries;
	if (peer_device->connection->agreed_pro_version < 100) {
		switch (nc->wire_protocol) {
		case DRBD_PROT_C:
			dp_flags |= DP_SEND_WRITE_ACK;
			break;
		case DRBD_PROT_B:
			dp_flags |= DP_SEND_RECEIVE_ACK;
			break;
		}
	}
	rcu_read_unlock();

	if (dp_flags & DP_SEND_WRITE_ACK) {
		peer_req->flags |= EE_SEND_WRITE_ACK;
		inc_unacked(device);
		/* corresponding dec_unacked() in e_end_block()
		 * respective _drbd_clear_done_ee */
	}

	if (dp_flags & DP_SEND_RECEIVE_ACK) {
		/* I really don't like it that the receiver thread
		 * sends on the msock, but anyways */
		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
	}

	if (tp) {
		/* two primaries implies protocol C */
		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
		peer_req->flags |= EE_IN_INTERVAL_TREE;
		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
		if (err)
			goto out_interrupted;
		spin_lock_irq(&device->resource->req_lock);
		err = handle_write_conflicts(device, peer_req);
		if (err) {
			spin_unlock_irq(&device->resource->req_lock);
			if (err == -ENOENT) {
				put_ldev(device);
				return 0;
			}
			goto out_interrupted;
		}
	} else {
		update_peer_seq(peer_device, peer_seq);
		spin_lock_irq(&device->resource->req_lock);
	}
	/* TRIM and WRITE_SAME are processed synchronously,
	 * we wait for all pending requests, respectively wait for
	 * active_ee to become empty in drbd_submit_peer_request();
	 * better not add ourselves here. */
	if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
		list_add_tail(&peer_req->w.list, &device->active_ee);
	spin_unlock_irq(&device->resource->req_lock);

	if (device->state.conn == C_SYNC_TARGET)
		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));

	if (device->state.pdsk < D_INCONSISTENT) {
		/* In case we have the only disk of the cluster, */
		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
		drbd_al_begin_io(device, &peer_req->i);
		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
	}

	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
				       DRBD_FAULT_DT_WR);
	if (!err)
		return 0;

	/* don't care for the reason here */
	drbd_err(device, "submit failed, triggering re-connect\n");
	spin_lock_irq(&device->resource->req_lock);
	list_del(&peer_req->w.list);
	drbd_remove_epoch_entry_interval(device, peer_req);
	spin_unlock_irq(&device->resource->req_lock);
	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
		drbd_al_complete_io(device, &peer_req->i);
	}

out_interrupted:
	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
	put_ldev(device);
	drbd_free_peer_req(device, peer_req);
	return err;
}