in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBDataRegionSyncConnector.java [294:355]
private void doTransfer(final File tsFile, final File modFile, final String dataBaseName)
throws PipeException, IOException {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();
final TPipeTransferResp resp;
// 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
if (Objects.nonNull(modFile) && clientManager.supportModsIfIsDataNodeReceiver()) {
transferFilePieces(modFile, clientAndStatus, true);
transferFilePieces(tsFile, clientAndStatus, true);
// 2. Transfer file seal signal with mod, which means the file is transferred completely
try {
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
modFile.getName(),
modFile.length(),
tsFile.getName(),
tsFile.length(),
dataBaseName));
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
clientManager.adjustTimeoutIfNecessary(e);
throw new PipeConnectionException(
String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
e);
}
} else {
transferFilePieces(tsFile, clientAndStatus, false);
// 2. Transfer file seal signal without mod, which means the file is transferred completely
try {
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
tsFile.getName(), tsFile.length(), dataBaseName));
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
clientManager.adjustTimeoutIfNecessary(e);
throw new PipeConnectionException(
String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
e);
}
}
final TSStatus status = resp.getStatus();
// Only handle the failed statuses to avoid string format performance overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
receiverStatusHandler.handle(
resp.getStatus(),
String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()),
tsFile.getName());
}
LOGGER.info("Successfully transferred file {}.", tsFile);
}