public Poller newPoller()

in kafka-connector/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java [16:38]


  public Poller newPoller(Map<String, String> params) {
    Map<String, ConfigValue> config = ConfigDefs.config().validateAll(params);
    SubscriptionPath path = SubscriptionPath.newBuilder()
        .setProject(ProjectPath
            .parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value()).project())
        .setLocation(CloudZone
            .parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString()))
        .setName(
            SubscriptionName.of(config.get(ConfigDefs.SUBSCRIPTION_NAME_FLAG).value().toString()))
        .build();
    FlowControlSettings flowControlSettings = FlowControlSettings.builder()
        .setMessagesOutstanding(
            (Long) config.get(ConfigDefs.FLOW_CONTROL_PARTITION_MESSAGES_FLAG).value())
        .setBytesOutstanding(
            (Long) config.get(ConfigDefs.FLOW_CONTROL_PARTITION_BYTES_FLAG).value()).build();
    Consumer<byte[], byte[]> consumer = ConsumerSettings.newBuilder().setAutocommit(true)
        .setSubscriptionPath(
            path).setPerPartitionFlowControlSettings(flowControlSettings).build().instantiate();
    // There is only one topic for Pub/Sub Lite subscriptions, and the consumer only exposes this
    // topic.
    consumer.subscribe(consumer.listTopics().keySet());
    return new PollerImpl(config.get(ConfigDefs.KAFKA_TOPIC_FLAG).value().toString(), consumer);
  }