public void start()

in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [119:165]


  public void start(Map<String, String> props) {
    Map<String, Object> validatedProps = new CloudPubSubSinkConnector().config().parse(props);
    cpsProject = validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
    cpsTopic = validatedProps.get(ConnectorUtils.CPS_TOPIC_CONFIG).toString();
    cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString();
    maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG);
    maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG);
    maxOutstandingRequestBytes =
        (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_REQUEST_BYTES);
    maxOutstandingMessages =
        (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_MESSAGES);
    maxDelayThresholdMs =
        (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_DELAY_THRESHOLD_MS);
    maxRequestTimeoutMs =
        (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_REQUEST_TIMEOUT_MS);
    maxTotalTimeoutMs =
        (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_TOTAL_TIMEOUT_MS);
    maxShutdownTimeoutMs =
        (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_SHUTDOWN_TIMEOUT_MS);
    messageBodyName = (String) validatedProps.get(CloudPubSubSinkConnector.CPS_MESSAGE_BODY_NAME);
    includeMetadata = (Boolean) validatedProps.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_METADATA);
    includeHeaders = (Boolean) validatedProps.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_HEADERS);
    orderingKeySource =
        OrderingKeySource.getEnum(
            (String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE));
    gcpCredentialsProvider = new ConnectorCredentialsProvider();
    String credentialsPath = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
    String credentialsJson = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
    if (credentialsPath != null) {
      try {
        gcpCredentialsProvider.loadFromFile(credentialsPath);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    } else if (credentialsJson != null) {
      try {
        gcpCredentialsProvider.loadJson(credentialsJson);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }
    if (publisher == null) {
      // Only do this if we did not use the constructor.
      createPublisher();
    }
    log.info("Start CloudPubSubSinkTask");
  }