in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [397:439]
private void createPublisher() {
ProjectTopicName fullTopic = ProjectTopicName.of(cpsProject, cpsTopic);
BatchingSettings.Builder batchingSettings = BatchingSettings.newBuilder()
.setDelayThreshold(Duration.ofMillis(maxDelayThresholdMs))
.setElementCountThreshold(maxBufferSize)
.setRequestByteThreshold(maxBufferBytes);
if (useFlowControl()) {
batchingSettings.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingRequestBytes(maxOutstandingRequestBytes)
.setMaxOutstandingElementCount(maxOutstandingMessages)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build());
}
com.google.cloud.pubsub.v1.Publisher.Builder builder =
com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic)
.setCredentialsProvider(gcpCredentialsProvider)
.setBatchingSettings(batchingSettings.build())
.setRetrySettings(
RetrySettings.newBuilder()
// All values that are not configurable come from the defaults for the publisher
// client library.
.setTotalTimeout(Duration.ofMillis(maxTotalTimeoutMs))
.setMaxRpcTimeout(Duration.ofMillis(maxRequestTimeoutMs))
.setInitialRetryDelay(Duration.ofMillis(5))
.setRetryDelayMultiplier(2)
.setMaxRetryDelay(Duration.ofMillis(Long.MAX_VALUE))
.setInitialRpcTimeout(Duration.ofSeconds(10))
.setRpcTimeoutMultiplier(2)
.build())
.setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()))
.setEndpoint(cpsEndpoint);
if (orderingKeySource != OrderingKeySource.NONE) {
builder.setEnableMessageOrdering(true);
}
try {
publisher = builder.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}