in flink-connector/flink-connector-gcp-pubsub/src/main/java/com/google/pubsub/flink/PubSubSource.java [91:130]
Subscriber createSubscriber(MessageReceiver receiver) {
Subscriber.Builder builder =
Subscriber.newBuilder(
ProjectSubscriptionName.of(projectName(), subscriptionName()).toString(), receiver);
// Channel settings copied from com.google.cloud:google-cloud-pubsub:1.124.1.
builder.setChannelProvider(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB API maximum message size.
.setMaxInboundMetadataSize(4 * 1024 * 1024) // 4MB API maximum metadata size)
.setKeepAliveTime(Duration.ofMinutes(5))
.setHeaderProvider(
FixedHeaderProvider.create(
"x-goog-api-client", "PubSub-Flink-Connector/1.0.0-SNAPSHOT"))
.build());
builder.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(maxOutstandingMessagesCount().or(1000L))
.setMaxOutstandingRequestBytes(
maxOutstandingMessagesBytes().or(100L * 1024L * 1024L)) // 100MB
.build());
if (parallelPullCount().isPresent()) {
builder.setParallelPullCount(parallelPullCount().get());
}
if (credentials().isPresent()) {
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials().get()));
}
if (endpoint().isPresent()) {
builder.setEndpoint(endpoint().get());
}
String emulatorEndpoint = EmulatorEndpoint.getEmulatorEndpoint(endpoint());
if (emulatorEndpoint != null) {
builder.setCredentialsProvider(NoCredentialsProvider.create());
builder.setChannelProvider(
FixedTransportChannelProvider.create(
GrpcTransportChannel.create(
ManagedChannelBuilder.forTarget(emulatorEndpoint).usePlaintext().build())));
}
return builder.build();
}