public void sendHandshakeReq()

in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/client/IoTDBSyncClientManager.java [202:263]


  public void sendHandshakeReq(final Pair<IoTDBSyncClient, Boolean> clientAndStatus) {
    final IoTDBSyncClient client = clientAndStatus.getLeft();
    try {
      final HashMap<String, String> params = new HashMap<>();
      params.put(
          PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
          PipeRuntimeOptions.TIMESTAMP_PRECISION.value());
      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, getClusterId());
      params.put(
          PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
          Boolean.toString(shouldReceiverConvertOnTypeMismatch));
      params.put(
          PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
      params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
      params.put(
          PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
          Boolean.toString(validateTsFile));
      params.put(
          PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
          Boolean.toString(shouldMarkAsPipeRequest));

      // Try to handshake by PipeTransferHandshakeV2Req.
      TPipeTransferResp resp = client.pipeTransfer(buildHandshakeV2Req(params));
      // Receiver may be an old version, so we need to retry to handshake by
      // PipeTransferHandshakeV1Req.
      if (resp.getStatus().getCode() == TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
        LOGGER.warn(
            "Handshake error with target server ip: {}, port: {}, because: {}. "
                + "Retry to handshake by PipeTransferHandshakeV1Req.",
            client.getIpAddress(),
            client.getPort(),
            resp.getStatus());
        supportModsIfIsDataNodeReceiver = false;
        resp = client.pipeTransfer(buildHandshakeV1Req());
      }

      if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
        LOGGER.warn(
            "Handshake error with target server ip: {}, port: {}, because: {}.",
            client.getIpAddress(),
            client.getPort(),
            resp.getStatus());
        endPoint2HandshakeErrorMessage.put(client.getEndPoint(), resp.getStatus().getMessage());
      } else {
        clientAndStatus.setRight(true);
        client.setTimeout(CONNECTION_TIMEOUT_MS.get());
        LOGGER.info(
            "Handshake success. Target server ip: {}, port: {}",
            client.getIpAddress(),
            client.getPort());
      }
    } catch (Exception e) {
      LOGGER.warn(
          "Handshake error with target server ip: {}, port: {}, because: {}.",
          client.getIpAddress(),
          client.getPort(),
          e.getMessage(),
          e);
      endPoint2HandshakeErrorMessage.put(client.getEndPoint(), e.getMessage());
    }
  }