in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/client/IoTDBSyncClient.java [100:157]
public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TException {
final int bodySizeLimit = (int) (PipeRuntimeOptions.THRIFT_FRAME_MAX_SIZE.value() * 0.8);
if (req.getVersion() != IoTDBConnectorRequestVersion.VERSION_1.getVersion()
|| req.body.limit() < bodySizeLimit) {
return super.pipeTransfer(req);
}
LOGGER.warn(
"The body size of the request is too large. The request will be sliced. Origin req: {}-{}. "
+ "Request body size: {}, threshold: {}",
req.getVersion(),
req.getType(),
req.body.limit(),
bodySizeLimit);
try {
final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
// Slice the buffer to avoid the buffer being too large
final int sliceCount =
req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1);
for (int i = 0; i < sliceCount; ++i) {
final int startIndexInBody = i * bodySizeLimit;
final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, req.body.limit());
final TPipeTransferResp sliceResp =
super.pipeTransfer(
PipeTransferSliceReq.toTPipeTransferReq(
sliceOrderId,
req.getType(),
i,
sliceCount,
req.body.duplicate(),
startIndexInBody,
endIndexInBody));
if (i == sliceCount - 1) {
return sliceResp;
}
if (sliceResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeConnectionException(
String.format(
"Failed to transfer slice. Origin req: %s-%s, slice index: %d, slice count: %d. Reason: %s",
req.getVersion(), req.getType(), i, sliceCount, sliceResp.getStatus()));
}
}
// Should not reach here
return super.pipeTransfer(req);
} catch (final Exception e) {
LOGGER.warn(
"Failed to transfer slice. Origin req: {}-{}. Retry the whole transfer.",
req.getVersion(),
req.getType(),
e);
// Fall back to the original behavior
return super.pipeTransfer(req);
}
}