in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubGRPCSubscriber.java [87:110]
private void makeSubscriber() {
try {
if (subscriber != null) {
subscriber.close();
}
log.info("Creating subscriber.");
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20MB
.build())
.setCredentialsProvider(gcpCredentialsProvider)
.setEndpoint(endpoint)
.build();
subscriber = GrpcSubscriberStub.create(subscriberStubSettings);
// We change the subscriber every 25 - 35 minutes in order to avoid GOAWAY errors.
nextSubscriberResetTime =
System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(10 * 60 * 1000)
+ 25 * 60 * 1000;
} catch (IOException e) {
throw new RuntimeException("Could not create subscriber stub; no subscribing can occur.", e);
}
}