protected void transferFilePieces()

in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java [150:215]


  protected void transferFilePieces(
      final File file,
      final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
      final boolean isMultiFile)
      throws PipeException, IOException {
    final int readFileBufferSize = PipeRuntimeOptions.PIPE_CONNECTOR_READ_FILE_BUFFER_SIZE.value();
    final byte[] readBuffer = new byte[readFileBufferSize];
    long position = 0;
    try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
      while (true) {
        final int readLength = reader.read(readBuffer);
        if (readLength == -1) {
          break;
        }

        final byte[] payLoad =
            readLength == readFileBufferSize
                ? readBuffer
                : Arrays.copyOfRange(readBuffer, 0, readLength);
        final PipeTransferFilePieceResp resp;
        try {
          final TPipeTransferReq req =
              compressIfNeeded(
                  isMultiFile
                      ? getTransferMultiFilePieceReq(file.getName(), position, payLoad)
                      : getTransferSingleFilePieceReq(file.getName(), position, payLoad));

          resp =
              PipeTransferFilePieceResp.fromTPipeTransferResp(
                  clientAndStatus.getLeft().pipeTransfer(req));
        } catch (final Exception e) {
          clientAndStatus.setRight(false);
          throw new PipeConnectionException(
              String.format(
                  "Network error when transfer file %s, because %s.", file, e.getMessage()),
              e);
        }

        position += readLength;

        final TSStatus status = resp.getStatus();
        // This case only happens when the connection is broken, and the connector is reconnected
        // to the receiver, then the receiver will redirect the file position to the last position
        if (status.getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
          position = resp.getEndWritingOffset();
          reader.seek(position);
          LOGGER.info("Redirect file position to {}.", position);
          continue;
        }

        // Send handshake req and then re-transfer the event
        if (status.getCode()
            == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
          clientManager.sendHandshakeReq(clientAndStatus);
        }
        // Only handle the failed statuses to avoid string format performance overhead
        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
            && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
          receiverStatusHandler.handle(
              resp.getStatus(),
              String.format("Transfer file %s error, result status %s.", file, resp.getStatus()),
              file.getName());
        }
      }
    }
  }