private TPipeConsensusTransferResp onRequest()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java [1411:1592]


    private TPipeConsensusTransferResp onRequest(
        final TPipeConsensusTransferReq req,
        final boolean isTransferTsFilePiece,
        final boolean isTransferTsFileSeal) {
      long startAcquireLockNanos = System.nanoTime();
      lock.lock();
      try {
        if (isClosed.get()) {
          return PipeConsensusReceiverAgent.closedResp(
              consensusPipeName.toString(), req.getCommitId());
        }
        long startDispatchNanos = System.nanoTime();
        metric.recordAcquireExecutorLockTimer(startDispatchNanos - startAcquireLockNanos);

        TCommitId tCommitId = req.getCommitId();
        RequestMeta requestMeta = new RequestMeta(tCommitId);
        LOGGER.info(
            "PipeConsensus-PipeName-{}: start to receive no.{} event",
            consensusPipeName,
            tCommitId);
        // if a req is deprecated, we will discard it
        // This case may happen in this scenario: leader has transferred {1,2} and is intending to
        // transfer {3, 4, 5, 6}. And in one moment, follower has received {4, 5, 6}, {3} is still
        // transferring due to some network latency.
        // At this time, leader restarts, and it will resend {3, 4, 5, 6} with incremental
        // rebootTimes. If the {3} sent before the leader restart arrives after the follower
        // receives
        // the request with incremental rebootTimes, the {3} sent before the leader restart needs to
        // be discarded.
        if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
          return deprecatedResp(THIS_NODE);
        }
        // Similarly, check pipeTask restartTimes
        if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
            && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
          return deprecatedResp(PIPE_TASK);
        }
        // Judge whether connector has rebooted or not, if the rebootTimes increases compared to
        // connectorRebootTimes, need to reset receiver because connector has been restarted.
        if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
          resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes());
        }
        // Similarly, check pipeTask restartTimes
        if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) {
          resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes());
        }
        // update metric
        if (isTransferTsFilePiece && !reqExecutionOrderBuffer.contains(requestMeta)) {
          // only update tsFileEventCount when tsFileEvent is first enqueue.
          tsFileEventCount.incrementAndGet();
        }
        if (!isTransferTsFileSeal && !isTransferTsFilePiece) {
          WALEventCount.incrementAndGet();
        }
        reqExecutionOrderBuffer.add(requestMeta);

        // TsFilePieceTransferEvent will not enter further procedure, it just holds a place in
        // buffer. Only after the corresponding sealing event is processed, this event can be
        // dequeued.
        if (isTransferTsFilePiece) {
          long startApplyNanos = System.nanoTime();
          metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
          requestMeta.setStartApplyNanos(startApplyNanos);
          return null;
        }

        if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize()
            && !reqExecutionOrderBuffer.first().equals(requestMeta)) {
          // If reqBuffer is full and current thread do not hold the reqBuffer's peek, this req
          // is not supposed to be processed. So current thread should notify the corresponding
          // threads to process the peek.
          condition.signalAll();
        }

        // Polling to process
        while (true) {
          if (reqExecutionOrderBuffer.first().equals(requestMeta)
              && tCommitId.getReplicateIndex() == onSyncedReplicateIndex + 1) {
            long startApplyNanos = System.nanoTime();
            metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
            requestMeta.setStartApplyNanos(startApplyNanos);
            // If current req is supposed to be process, load this event through
            // DataRegionStateMachine.
            TPipeConsensusTransferResp resp = loadEvent(req);

            // Only when event apply is successful and what is transmitted is not TsFilePiece, req
            // will be removed from the buffer and onSyncedCommitIndex will be updated. Because pipe
            // will transfer multi reqs with same commitId in a single TsFileInsertionEvent, only
            // when the last seal req is applied, we can discard this event.
            if (resp != null
                && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
              onSuccess(tCommitId, isTransferTsFileSeal);
            }
            return resp;
          }

          if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize()
              && reqExecutionOrderBuffer.first().equals(requestMeta)) {
            // TODO: Turn it to debug after GA
            LOGGER.info(
                "PipeConsensus-PipeName-{}: no.{} event get executed because receiver buffer's len >= pipeline, current receiver syncIndex {}, current buffer len {}",
                consensusPipeName,
                tCommitId,
                onSyncedReplicateIndex,
                reqExecutionOrderBuffer.size());
            long startApplyNanos = System.nanoTime();
            metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
            requestMeta.setStartApplyNanos(startApplyNanos);
            // If the reqBuffer is full and its peek is hold by current thread, load this event.
            TPipeConsensusTransferResp resp = loadEvent(req);

            if (resp != null
                && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
              onSuccess(tCommitId, isTransferTsFileSeal);
            }
            return resp;
          } else {
            // if the req is not supposed to be processed and reqBuffer is not full, current thread
            // should wait until reqBuffer is full, which indicates the receiver has received all
            // the requests from the connector without duplication or leakage.
            try {
              boolean timeout =
                  !condition.await(
                      PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, TimeUnit.MILLISECONDS);

              if (isClosed.get()) {
                return PipeConsensusReceiverAgent.closedResp(
                    consensusPipeName.toString(), req.getCommitId());
              }
              // If some reqs find the buffer no longer contains their requestMeta after jumping out
              // from condition.await, it may indicate that during their wait, some reqs with newer
              // pipeTaskStartTimes or rebootTimes came in and refreshed the requestBuffer. In that
              // cases we need to discard these requests.
              if (!reqExecutionOrderBuffer.contains(requestMeta)) {
                return deprecatedResp(String.format("%s or %s", THIS_NODE, PIPE_TASK));
              }
              // If the buffer is not full after waiting timeout, we suppose that the sender will
              // not send any more events at this time, that is, the sender has sent all events. At
              // this point we apply the event at reqBuffer's peek
              if (timeout
                  && reqExecutionOrderBuffer.size() < IOTDB_CONFIG.getIotConsensusV2PipelineSize()
                  && reqExecutionOrderBuffer.first() != null
                  && reqExecutionOrderBuffer.first().equals(requestMeta)) {
                // TODO: Turn it to debug after GA
                LOGGER.info(
                    "PipeConsensus-PipeName-{}: no.{} event get executed after awaiting timeout, current receiver syncIndex: {}",
                    consensusPipeName,
                    tCommitId,
                    onSyncedReplicateIndex);
                long startApplyNanos = System.nanoTime();
                metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
                requestMeta.setStartApplyNanos(startApplyNanos);
                TPipeConsensusTransferResp resp = loadEvent(req);

                if (resp != null
                    && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                  onSuccess(tCommitId, isTransferTsFileSeal);
                }
                return resp;
              }
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
              LOGGER.warn(
                  "PipeConsensus-PipeName-{}: current waiting is interrupted. onSyncedCommitIndex: {}. Exception: ",
                  consensusPipeName,
                  tCommitId.getReplicateIndex(),
                  e);
              // Avoid infinite loop when RPC thread is killed by OS
              return new TPipeConsensusTransferResp(
                  RpcUtils.getStatus(
                      TSStatusCode.SHUT_DOWN_ERROR,
                      "RPC processor is interrupted by shutdown hook when wait on condition!"));
            }
          }
        }
      } finally {
        // let all threads that may still await become active again to acquire lock instead of
        // meaningless sleeping in the condition while lock is already released.
        condition.signalAll();
        lock.unlock();
      }
    }