in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java [198:441]
public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
try {
final long startTime = System.nanoTime();
final short rawRequestType = req.getType();
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
final PipeRequestType requestType = PipeRequestType.valueOf(rawRequestType);
if (requestType != PipeRequestType.TRANSFER_SLICE) {
sliceReqHandler.clear();
}
switch (requestType) {
case HANDSHAKE_DATANODE_V1:
{
try {
return handleTransferHandshakeV1(
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime);
}
}
case HANDSHAKE_DATANODE_V2:
{
try {
return handleTransferHandshakeV2(
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime);
}
}
case TRANSFER_TABLET_INSERT_NODE:
{
try {
return handleTransferTabletInsertNode(
PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_TABLET_INSERT_NODE_V2:
{
try {
return handleTransferTabletInsertNode(
PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime);
}
}
case TRANSFER_TABLET_RAW:
{
try {
return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTabletRawTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_TABLET_RAW_V2:
{
try {
return handleTransferTabletRaw(
PipeTransferTabletRawReqV2.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTabletRawV2Timer(System.nanoTime() - startTime);
}
}
case TRANSFER_TABLET_BINARY:
{
try {
return handleTransferTabletBinary(
PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTabletBinaryTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_TABLET_BINARY_V2:
{
try {
return handleTransferTabletBinary(
PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime);
}
}
case TRANSFER_TABLET_BATCH:
{
try {
return handleTransferTabletBatch(
PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTabletBatchTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_TABLET_BATCH_V2:
{
try {
return handleTransferTabletBatchV2(
PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTabletBatchV2Timer(System.nanoTime() - startTime);
}
}
case TRANSFER_TS_FILE_PIECE:
{
try {
return handleTransferFilePiece(
PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
true);
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTsFilePieceTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_TS_FILE_SEAL:
{
try {
return handleTransferFileSealV1(
PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTsFileSealTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_TS_FILE_PIECE_WITH_MOD:
{
try {
return handleTransferFilePiece(
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
false);
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_TS_FILE_SEAL_WITH_MOD:
{
try {
return handleTransferFileSealV2(
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_PLAN_NODE:
{
try {
return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferSchemaPlanTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_SCHEMA_SNAPSHOT_PIECE:
{
try {
return handleTransferFilePiece(
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
false);
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_SCHEMA_SNAPSHOT_SEAL:
{
try {
return handleTransferFileSealV2(
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime);
}
}
case HANDSHAKE_CONFIGNODE_V1:
case HANDSHAKE_CONFIGNODE_V2:
case TRANSFER_CONFIG_PLAN:
case TRANSFER_CONFIG_SNAPSHOT_PIECE:
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
{
try {
// Config requests will first be received by the DataNode receiver,
// then transferred to ConfigNode receiver to execute.
return handleTransferConfigPlan(req);
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferConfigPlanTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_SLICE:
{
try {
return handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferSliceTimer(System.nanoTime() - startTime);
}
}
case TRANSFER_COMPRESSED:
{
try {
return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferCompressedTimer(System.nanoTime() - startTime);
}
}
default:
break;
}
}
// Unknown request type, which means the request can not be handled by this receiver,
// maybe the version of the receiver is not compatible with the sender
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TYPE_ERROR,
String.format("Unknown PipeRequestType %s.", rawRequestType));
LOGGER.warn(
"Receiver id = {}: Unknown PipeRequestType, response status = {}.",
receiverId.get(),
status);
return new TPipeTransferResp(status);
} catch (final Exception e) {
final String error =
String.format("Exception %s encountered while handling request %s.", e.getMessage(), req);
LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error));
}
}