in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java [1411:1592]
private TPipeConsensusTransferResp onRequest(
final TPipeConsensusTransferReq req,
final boolean isTransferTsFilePiece,
final boolean isTransferTsFileSeal) {
long startAcquireLockNanos = System.nanoTime();
lock.lock();
try {
if (isClosed.get()) {
return PipeConsensusReceiverAgent.closedResp(
consensusPipeName.toString(), req.getCommitId());
}
long startDispatchNanos = System.nanoTime();
metric.recordAcquireExecutorLockTimer(startDispatchNanos - startAcquireLockNanos);
TCommitId tCommitId = req.getCommitId();
RequestMeta requestMeta = new RequestMeta(tCommitId);
LOGGER.info(
"PipeConsensus-PipeName-{}: start to receive no.{} event",
consensusPipeName,
tCommitId);
// if a req is deprecated, we will discard it
// This case may happen in this scenario: leader has transferred {1,2} and is intending to
// transfer {3, 4, 5, 6}. And in one moment, follower has received {4, 5, 6}, {3} is still
// transferring due to some network latency.
// At this time, leader restarts, and it will resend {3, 4, 5, 6} with incremental
// rebootTimes. If the {3} sent before the leader restart arrives after the follower
// receives
// the request with incremental rebootTimes, the {3} sent before the leader restart needs to
// be discarded.
if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
return deprecatedResp(THIS_NODE);
}
// Similarly, check pipeTask restartTimes
if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
&& tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
return deprecatedResp(PIPE_TASK);
}
// Judge whether connector has rebooted or not, if the rebootTimes increases compared to
// connectorRebootTimes, need to reset receiver because connector has been restarted.
if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes());
}
// Similarly, check pipeTask restartTimes
if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) {
resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes());
}
// update metric
if (isTransferTsFilePiece && !reqExecutionOrderBuffer.contains(requestMeta)) {
// only update tsFileEventCount when tsFileEvent is first enqueue.
tsFileEventCount.incrementAndGet();
}
if (!isTransferTsFileSeal && !isTransferTsFilePiece) {
WALEventCount.incrementAndGet();
}
reqExecutionOrderBuffer.add(requestMeta);
// TsFilePieceTransferEvent will not enter further procedure, it just holds a place in
// buffer. Only after the corresponding sealing event is processed, this event can be
// dequeued.
if (isTransferTsFilePiece) {
long startApplyNanos = System.nanoTime();
metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
requestMeta.setStartApplyNanos(startApplyNanos);
return null;
}
if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize()
&& !reqExecutionOrderBuffer.first().equals(requestMeta)) {
// If reqBuffer is full and current thread do not hold the reqBuffer's peek, this req
// is not supposed to be processed. So current thread should notify the corresponding
// threads to process the peek.
condition.signalAll();
}
// Polling to process
while (true) {
if (reqExecutionOrderBuffer.first().equals(requestMeta)
&& tCommitId.getReplicateIndex() == onSyncedReplicateIndex + 1) {
long startApplyNanos = System.nanoTime();
metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
requestMeta.setStartApplyNanos(startApplyNanos);
// If current req is supposed to be process, load this event through
// DataRegionStateMachine.
TPipeConsensusTransferResp resp = loadEvent(req);
// Only when event apply is successful and what is transmitted is not TsFilePiece, req
// will be removed from the buffer and onSyncedCommitIndex will be updated. Because pipe
// will transfer multi reqs with same commitId in a single TsFileInsertionEvent, only
// when the last seal req is applied, we can discard this event.
if (resp != null
&& resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
onSuccess(tCommitId, isTransferTsFileSeal);
}
return resp;
}
if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize()
&& reqExecutionOrderBuffer.first().equals(requestMeta)) {
// TODO: Turn it to debug after GA
LOGGER.info(
"PipeConsensus-PipeName-{}: no.{} event get executed because receiver buffer's len >= pipeline, current receiver syncIndex {}, current buffer len {}",
consensusPipeName,
tCommitId,
onSyncedReplicateIndex,
reqExecutionOrderBuffer.size());
long startApplyNanos = System.nanoTime();
metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
requestMeta.setStartApplyNanos(startApplyNanos);
// If the reqBuffer is full and its peek is hold by current thread, load this event.
TPipeConsensusTransferResp resp = loadEvent(req);
if (resp != null
&& resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
onSuccess(tCommitId, isTransferTsFileSeal);
}
return resp;
} else {
// if the req is not supposed to be processed and reqBuffer is not full, current thread
// should wait until reqBuffer is full, which indicates the receiver has received all
// the requests from the connector without duplication or leakage.
try {
boolean timeout =
!condition.await(
PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, TimeUnit.MILLISECONDS);
if (isClosed.get()) {
return PipeConsensusReceiverAgent.closedResp(
consensusPipeName.toString(), req.getCommitId());
}
// If some reqs find the buffer no longer contains their requestMeta after jumping out
// from condition.await, it may indicate that during their wait, some reqs with newer
// pipeTaskStartTimes or rebootTimes came in and refreshed the requestBuffer. In that
// cases we need to discard these requests.
if (!reqExecutionOrderBuffer.contains(requestMeta)) {
return deprecatedResp(String.format("%s or %s", THIS_NODE, PIPE_TASK));
}
// If the buffer is not full after waiting timeout, we suppose that the sender will
// not send any more events at this time, that is, the sender has sent all events. At
// this point we apply the event at reqBuffer's peek
if (timeout
&& reqExecutionOrderBuffer.size() < IOTDB_CONFIG.getIotConsensusV2PipelineSize()
&& reqExecutionOrderBuffer.first() != null
&& reqExecutionOrderBuffer.first().equals(requestMeta)) {
// TODO: Turn it to debug after GA
LOGGER.info(
"PipeConsensus-PipeName-{}: no.{} event get executed after awaiting timeout, current receiver syncIndex: {}",
consensusPipeName,
tCommitId,
onSyncedReplicateIndex);
long startApplyNanos = System.nanoTime();
metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
requestMeta.setStartApplyNanos(startApplyNanos);
TPipeConsensusTransferResp resp = loadEvent(req);
if (resp != null
&& resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
onSuccess(tCommitId, isTransferTsFileSeal);
}
return resp;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(
"PipeConsensus-PipeName-{}: current waiting is interrupted. onSyncedCommitIndex: {}. Exception: ",
consensusPipeName,
tCommitId.getReplicateIndex(),
e);
// Avoid infinite loop when RPC thread is killed by OS
return new TPipeConsensusTransferResp(
RpcUtils.getStatus(
TSStatusCode.SHUT_DOWN_ERROR,
"RPC processor is interrupted by shutdown hook when wait on condition!"));
}
}
}
} finally {
// let all threads that may still await become active again to acquire lock instead of
// meaningless sleeping in the condition while lock is already released.
condition.signalAll();
lock.unlock();
}
}