in drbd/drbd_receiver.c [2836:3047]
static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
{
struct drbd_peer_device *peer_device;
struct drbd_device *device;
sector_t sector;
sector_t capacity;
struct drbd_peer_request *peer_req;
struct digest_info *di = NULL;
int size, verb;
unsigned int fault_type;
struct p_block_req *p = pi->data;
peer_device = conn_peer_device(connection, pi->vnr);
if (!peer_device)
return -EIO;
device = peer_device->device;
capacity = get_capacity(device->vdisk);
sector = be64_to_cpu(p->sector);
size = be32_to_cpu(p->blksize);
if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
(unsigned long long)sector, size);
return -EINVAL;
}
if (sector + (size>>9) > capacity) {
drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
(unsigned long long)sector, size);
return -EINVAL;
}
if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
verb = 1;
switch (pi->cmd) {
case P_DATA_REQUEST:
drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
break;
case P_RS_THIN_REQ:
case P_RS_DATA_REQUEST:
case P_CSUM_RS_REQUEST:
case P_OV_REQUEST:
drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
break;
case P_OV_REPLY:
verb = 0;
dec_rs_pending(device);
drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
break;
default:
BUG();
}
if (verb && __ratelimit(&drbd_ratelimit_state))
drbd_err(device, "Can not satisfy peer's read request, "
"no local data.\n");
/* drain possibly payload */
return drbd_drain_block(peer_device, pi->size);
}
/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
* "criss-cross" setup, that might cause write-out on some other DRBD,
* which in turn might block on the other node at this very place. */
peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
size, GFP_NOIO);
if (!peer_req) {
put_ldev(device);
return -ENOMEM;
}
switch (pi->cmd) {
case P_DATA_REQUEST:
peer_req->w.cb = w_e_end_data_req;
fault_type = DRBD_FAULT_DT_RD;
/* application IO, don't drbd_rs_begin_io */
peer_req->flags |= EE_APPLICATION;
goto submit;
case P_RS_THIN_REQ:
/* If at some point in the future we have a smart way to
find out if this data block is completely deallocated,
then we would do something smarter here than reading
the block... */
peer_req->flags |= EE_RS_THIN_REQ;
fallthrough;
case P_RS_DATA_REQUEST:
peer_req->w.cb = w_e_end_rsdata_req;
fault_type = DRBD_FAULT_RS_RD;
/* used in the sector offset progress display */
device->bm_resync_fo = BM_SECT_TO_BIT(sector);
break;
case P_OV_REPLY:
case P_CSUM_RS_REQUEST:
fault_type = DRBD_FAULT_RS_RD;
di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
if (!di)
goto out_free_e;
di->digest_size = pi->size;
di->digest = (((char *)di)+sizeof(struct digest_info));
peer_req->digest = di;
peer_req->flags |= EE_HAS_DIGEST;
if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
goto out_free_e;
if (pi->cmd == P_CSUM_RS_REQUEST) {
D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
peer_req->w.cb = w_e_end_csum_rs_req;
/* used in the sector offset progress display */
device->bm_resync_fo = BM_SECT_TO_BIT(sector);
/* remember to report stats in drbd_resync_finished */
device->use_csums = true;
} else if (pi->cmd == P_OV_REPLY) {
/* track progress, we may need to throttle */
atomic_add(size >> 9, &device->rs_sect_in);
peer_req->w.cb = w_e_end_ov_reply;
dec_rs_pending(device);
/* drbd_rs_begin_io done when we sent this request,
* but accounting still needs to be done. */
goto submit_for_resync;
}
break;
case P_OV_REQUEST:
if (device->ov_start_sector == ~(sector_t)0 &&
peer_device->connection->agreed_pro_version >= 90) {
unsigned long now = jiffies;
int i;
device->ov_start_sector = sector;
device->ov_position = sector;
device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
device->rs_total = device->ov_left;
for (i = 0; i < DRBD_SYNC_MARKS; i++) {
device->rs_mark_left[i] = device->ov_left;
device->rs_mark_time[i] = now;
}
drbd_info(device, "Online Verify start sector: %llu\n",
(unsigned long long)sector);
}
peer_req->w.cb = w_e_end_ov_req;
fault_type = DRBD_FAULT_RS_RD;
break;
default:
BUG();
}
/* Throttle, drbd_rs_begin_io and submit should become asynchronous
* wrt the receiver, but it is not as straightforward as it may seem.
* Various places in the resync start and stop logic assume resync
* requests are processed in order, requeuing this on the worker thread
* introduces a bunch of new code for synchronization between threads.
*
* Unlimited throttling before drbd_rs_begin_io may stall the resync
* "forever", throttling after drbd_rs_begin_io will lock that extent
* for application writes for the same time. For now, just throttle
* here, where the rest of the code expects the receiver to sleep for
* a while, anyways.
*/
/* Throttle before drbd_rs_begin_io, as that locks out application IO;
* this defers syncer requests for some time, before letting at least
* on request through. The resync controller on the receiving side
* will adapt to the incoming rate accordingly.
*
* We cannot throttle here if remote is Primary/SyncTarget:
* we would also throttle its application reads.
* In that case, throttling is done on the SyncTarget only.
*/
/* Even though this may be a resync request, we do add to "read_ee";
* "sync_ee" is only used for resync WRITEs.
* Add to list early, so debugfs can find this request
* even if we have to sleep below. */
spin_lock_irq(&device->resource->req_lock);
list_add_tail(&peer_req->w.list, &device->read_ee);
spin_unlock_irq(&device->resource->req_lock);
update_receiver_timing_details(connection, drbd_rs_should_slow_down);
if (device->state.peer != R_PRIMARY
&& drbd_rs_should_slow_down(device, sector, false))
schedule_timeout_uninterruptible(HZ/10);
update_receiver_timing_details(connection, drbd_rs_begin_io);
if (drbd_rs_begin_io(device, sector))
goto out_free_e;
submit_for_resync:
atomic_add(size >> 9, &device->rs_sect_ev);
submit:
update_receiver_timing_details(connection, drbd_submit_peer_request);
inc_unacked(device);
if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
fault_type) == 0)
return 0;
/* don't care for the reason here */
drbd_err(device, "submit failed, triggering re-connect\n");
out_free_e:
spin_lock_irq(&device->resource->req_lock);
list_del(&peer_req->w.list);
spin_unlock_irq(&device->resource->req_lock);
/* no drbd_rs_complete_io(), we are dropping the connection anyways */
put_ldev(device);
drbd_free_peer_req(device, peer_req);
return -EIO;
}