in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/client/IoTDBSyncClientManager.java [202:263]
public void sendHandshakeReq(final Pair<IoTDBSyncClient, Boolean> clientAndStatus) {
final IoTDBSyncClient client = clientAndStatus.getLeft();
try {
final HashMap<String, String> params = new HashMap<>();
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
PipeRuntimeOptions.TIMESTAMP_PRECISION.value());
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, getClusterId());
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(validateTsFile));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
Boolean.toString(shouldMarkAsPipeRequest));
// Try to handshake by PipeTransferHandshakeV2Req.
TPipeTransferResp resp = client.pipeTransfer(buildHandshakeV2Req(params));
// Receiver may be an old version, so we need to retry to handshake by
// PipeTransferHandshakeV1Req.
if (resp.getStatus().getCode() == TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
LOGGER.warn(
"Handshake error with target server ip: {}, port: {}, because: {}. "
+ "Retry to handshake by PipeTransferHandshakeV1Req.",
client.getIpAddress(),
client.getPort(),
resp.getStatus());
supportModsIfIsDataNodeReceiver = false;
resp = client.pipeTransfer(buildHandshakeV1Req());
}
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Handshake error with target server ip: {}, port: {}, because: {}.",
client.getIpAddress(),
client.getPort(),
resp.getStatus());
endPoint2HandshakeErrorMessage.put(client.getEndPoint(), resp.getStatus().getMessage());
} else {
clientAndStatus.setRight(true);
client.setTimeout(CONNECTION_TIMEOUT_MS.get());
LOGGER.info(
"Handshake success. Target server ip: {}, port: {}",
client.getIpAddress(),
client.getPort());
}
} catch (Exception e) {
LOGGER.warn(
"Handshake error with target server ip: {}, port: {}, because: {}.",
client.getIpAddress(),
client.getPort(),
e.getMessage(),
e);
endPoint2HandshakeErrorMessage.put(client.getEndPoint(), e.getMessage());
}
}