public void start()

in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java [94:182]


  public void start(Map<String, String> props) {
    Map<String, Object> validatedProps = new CloudPubSubSourceConnector().config().parse(props);
    cpsSubscription = ProjectSubscriptionName.newBuilder()
        .setProject(validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString())
        .setSubscription(
            validatedProps.get(CloudPubSubSourceConnector.CPS_SUBSCRIPTION_CONFIG).toString())
        .build();
    String cpsEndpoint = (String) validatedProps.get(ConnectorUtils.CPS_ENDPOINT);
    kafkaTopic = validatedProps.get(CloudPubSubSourceConnector.KAFKA_TOPIC_CONFIG).toString();
    int cpsMaxBatchSize = (Integer) validatedProps
        .get(CloudPubSubSourceConnector.CPS_MAX_BATCH_SIZE_CONFIG);
    kafkaPartitions =
        (Integer) validatedProps.get(CloudPubSubSourceConnector.KAFKA_PARTITIONS_CONFIG);
    kafkaMessageKeyAttribute =
        (String) validatedProps.get(CloudPubSubSourceConnector.KAFKA_MESSAGE_KEY_CONFIG);
    kafkaMessageTimestampAttribute =
        (String) validatedProps.get(CloudPubSubSourceConnector.KAFKA_MESSAGE_TIMESTAMP_CONFIG);
    kafkaPartitionScheme =
        PartitionScheme.getEnum(
            (String) validatedProps.get(CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG));
    useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS);
    makeOrderingKeyAttribute =
        (Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_MAKE_ORDERING_KEY_ATTRIBUTE);
    ConnectorCredentialsProvider gcpCredentialsProvider = new ConnectorCredentialsProvider();
    String gcpCredentialsFilePath = (String) validatedProps
        .get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
    String credentialsJson = (String) validatedProps
        .get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
    boolean useStreamingPull = (Boolean) validatedProps
        .get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_ENABLED);
    long streamingPullBytes = (Long) validatedProps
        .get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_FLOW_CONTROL_BYTES);
    long streamingPullMessages = (Long) validatedProps
        .get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES);
    int streamingPullParallelStreams = (Integer) validatedProps
        .get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_PARALLEL_STREAMS);
    long streamingPullMaxAckDeadlineMs = (Long) validatedProps
        .get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_ACK_EXTENSION_MS);
    long streamingPullMaxMsPerAckDeadlineExtension = (Long) validatedProps
        .get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION);
    if (gcpCredentialsFilePath != null) {
      try {
        gcpCredentialsProvider.loadFromFile(gcpCredentialsFilePath);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    } else if (credentialsJson != null) {
      try {
        gcpCredentialsProvider.loadJson(credentialsJson);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }
    // Only do this if we did not set it through the constructor.
    if (subscriber == null) {
      if (useStreamingPull) {
        subscriber = new StreamingPullSubscriber(
            receiver -> {
              Subscriber.Builder builder = Subscriber.newBuilder(cpsSubscription, receiver)
                  .setCredentialsProvider(gcpCredentialsProvider)
                  .setFlowControlSettings(
                      FlowControlSettings.newBuilder()
                          .setLimitExceededBehavior(LimitExceededBehavior.Block)
                          .setMaxOutstandingElementCount(streamingPullMessages)
                          .setMaxOutstandingRequestBytes(streamingPullBytes).build())
                  .setParallelPullCount(streamingPullParallelStreams)
                  .setEndpoint(cpsEndpoint)
                  .setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()));
              if (streamingPullMaxAckDeadlineMs > 0) {
                builder.setMaxAckExtensionPeriod(Duration.ofMillis(streamingPullMaxAckDeadlineMs));
              }
              if (streamingPullMaxMsPerAckDeadlineExtension > 0) {
                builder.setMaxDurationPerAckExtension(
                    Duration.ofMillis(streamingPullMaxMsPerAckDeadlineExtension));
              }
              return builder.build();
            });
      } else {
        subscriber = new AckBatchingSubscriber(
            new CloudPubSubRoundRobinSubscriber(NUM_CPS_SUBSCRIBERS,
                gcpCredentialsProvider,
                cpsEndpoint, cpsSubscription, cpsMaxBatchSize), runnable -> getSystemExecutor()
            .scheduleAtFixedRate(runnable, 100, 100, TimeUnit.MILLISECONDS));
      }
    }
    standardAttributes.add(kafkaMessageKeyAttribute);
    standardAttributes.add(kafkaMessageTimestampAttribute);
    log.info("Started a CloudPubSubSourceTask.");
  }