public TPipeTransferResp pipeTransfer()

in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/client/IoTDBSyncClient.java [100:157]


  public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TException {
    final int bodySizeLimit = (int) (PipeRuntimeOptions.THRIFT_FRAME_MAX_SIZE.value() * 0.8);
    if (req.getVersion() != IoTDBConnectorRequestVersion.VERSION_1.getVersion()
        || req.body.limit() < bodySizeLimit) {
      return super.pipeTransfer(req);
    }

    LOGGER.warn(
        "The body size of the request is too large. The request will be sliced. Origin req: {}-{}. "
            + "Request body size: {}, threshold: {}",
        req.getVersion(),
        req.getType(),
        req.body.limit(),
        bodySizeLimit);

    try {
      final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
      // Slice the buffer to avoid the buffer being too large
      final int sliceCount =
          req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1);
      for (int i = 0; i < sliceCount; ++i) {
        final int startIndexInBody = i * bodySizeLimit;
        final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, req.body.limit());
        final TPipeTransferResp sliceResp =
            super.pipeTransfer(
                PipeTransferSliceReq.toTPipeTransferReq(
                    sliceOrderId,
                    req.getType(),
                    i,
                    sliceCount,
                    req.body.duplicate(),
                    startIndexInBody,
                    endIndexInBody));

        if (i == sliceCount - 1) {
          return sliceResp;
        }

        if (sliceResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
          throw new PipeConnectionException(
              String.format(
                  "Failed to transfer slice. Origin req: %s-%s, slice index: %d, slice count: %d. Reason: %s",
                  req.getVersion(), req.getType(), i, sliceCount, sliceResp.getStatus()));
        }
      }

      // Should not reach here
      return super.pipeTransfer(req);
    } catch (final Exception e) {
      LOGGER.warn(
          "Failed to transfer slice. Origin req: {}-{}. Retry the whole transfer.",
          req.getVersion(),
          req.getType(),
          e);
      // Fall back to the original behavior
      return super.pipeTransfer(req);
    }
  }