Subscriber createSubscriber()

in flink-connector/flink-connector-gcp-pubsub/src/main/java/com/google/pubsub/flink/PubSubSource.java [91:130]


  Subscriber createSubscriber(MessageReceiver receiver) {
    Subscriber.Builder builder =
        Subscriber.newBuilder(
            ProjectSubscriptionName.of(projectName(), subscriptionName()).toString(), receiver);
    // Channel settings copied from com.google.cloud:google-cloud-pubsub:1.124.1.
    builder.setChannelProvider(
        SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
            .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB API maximum message size.
            .setMaxInboundMetadataSize(4 * 1024 * 1024) // 4MB API maximum metadata size)
            .setKeepAliveTime(Duration.ofMinutes(5))
            .setHeaderProvider(
                FixedHeaderProvider.create(
                    "x-goog-api-client", "PubSub-Flink-Connector/1.0.0-SNAPSHOT"))
            .build());
    builder.setFlowControlSettings(
        FlowControlSettings.newBuilder()
            .setMaxOutstandingElementCount(maxOutstandingMessagesCount().or(1000L))
            .setMaxOutstandingRequestBytes(
                maxOutstandingMessagesBytes().or(100L * 1024L * 1024L)) // 100MB
            .build());
    if (parallelPullCount().isPresent()) {
      builder.setParallelPullCount(parallelPullCount().get());
    }
    if (credentials().isPresent()) {
      builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials().get()));
    }
    if (endpoint().isPresent()) {
      builder.setEndpoint(endpoint().get());
    }

    String emulatorEndpoint = EmulatorEndpoint.getEmulatorEndpoint(endpoint());
    if (emulatorEndpoint != null) {
      builder.setCredentialsProvider(NoCredentialsProvider.create());
      builder.setChannelProvider(
          FixedTransportChannelProvider.create(
              GrpcTransportChannel.create(
                  ManagedChannelBuilder.forTarget(emulatorEndpoint).usePlaintext().build())));
    }
    return builder.build();
  }