private void doTransfer()

in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java [294:355]


  private void doTransfer(final File tsFile, final File modFile, final String dataBaseName)
      throws PipeException, IOException {

    final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();
    final TPipeTransferResp resp;

    // 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
    if (Objects.nonNull(modFile) && clientManager.supportModsIfIsDataNodeReceiver()) {
      transferFilePieces(modFile, clientAndStatus, true);
      transferFilePieces(tsFile, clientAndStatus, true);

      // 2. Transfer file seal signal with mod, which means the file is transferred completely
      try {
        final TPipeTransferReq req =
            compressIfNeeded(
                PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
                    modFile.getName(),
                    modFile.length(),
                    tsFile.getName(),
                    tsFile.length(),
                    dataBaseName));

        resp = clientAndStatus.getLeft().pipeTransfer(req);
      } catch (final Exception e) {
        clientAndStatus.setRight(false);
        clientManager.adjustTimeoutIfNecessary(e);
        throw new PipeConnectionException(
            String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
            e);
      }
    } else {
      transferFilePieces(tsFile, clientAndStatus, false);

      // 2. Transfer file seal signal without mod, which means the file is transferred completely
      try {
        final TPipeTransferReq req =
            compressIfNeeded(
                PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
                    tsFile.getName(), tsFile.length(), dataBaseName));

        resp = clientAndStatus.getLeft().pipeTransfer(req);
      } catch (final Exception e) {
        clientAndStatus.setRight(false);
        clientManager.adjustTimeoutIfNecessary(e);
        throw new PipeConnectionException(
            String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
            e);
      }
    }

    final TSStatus status = resp.getStatus();
    // Only handle the failed statuses to avoid string format performance overhead
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
        && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
      receiverStatusHandler.handle(
          resp.getStatus(),
          String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()),
          tsFile.getName());
    }

    LOGGER.info("Successfully transferred file {}.", tsFile);
  }