in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBSslSyncConnector.java [150:215]
protected void transferFilePieces(
final File file,
final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
final boolean isMultiFile)
throws PipeException, IOException {
final int readFileBufferSize = PipeRuntimeOptions.PIPE_CONNECTOR_READ_FILE_BUFFER_SIZE.value();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
while (true) {
final int readLength = reader.read(readBuffer);
if (readLength == -1) {
break;
}
final byte[] payLoad =
readLength == readFileBufferSize
? readBuffer
: Arrays.copyOfRange(readBuffer, 0, readLength);
final PipeTransferFilePieceResp resp;
try {
final TPipeTransferReq req =
compressIfNeeded(
isMultiFile
? getTransferMultiFilePieceReq(file.getName(), position, payLoad)
: getTransferSingleFilePieceReq(file.getName(), position, payLoad));
resp =
PipeTransferFilePieceResp.fromTPipeTransferResp(
clientAndStatus.getLeft().pipeTransfer(req));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
String.format(
"Network error when transfer file %s, because %s.", file, e.getMessage()),
e);
}
position += readLength;
final TSStatus status = resp.getStatus();
// This case only happens when the connection is broken, and the connector is reconnected
// to the receiver, then the receiver will redirect the file position to the last position
if (status.getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
position = resp.getEndWritingOffset();
reader.seek(position);
LOGGER.info("Redirect file position to {}.", position);
continue;
}
// Send handshake req and then re-transfer the event
if (status.getCode()
== TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
clientManager.sendHandshakeReq(clientAndStatus);
}
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
receiverStatusHandler.handle(
resp.getStatus(),
String.format("Transfer file %s error, result status %s.", file, resp.getStatus()),
file.getName());
}
}
}
}