private void createPublisher()

in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [397:439]


  private void createPublisher() {
    ProjectTopicName fullTopic = ProjectTopicName.of(cpsProject, cpsTopic);

    BatchingSettings.Builder batchingSettings = BatchingSettings.newBuilder()
        .setDelayThreshold(Duration.ofMillis(maxDelayThresholdMs))
        .setElementCountThreshold(maxBufferSize)
        .setRequestByteThreshold(maxBufferBytes);

    if (useFlowControl()) {
      batchingSettings.setFlowControlSettings(FlowControlSettings.newBuilder()
          .setMaxOutstandingRequestBytes(maxOutstandingRequestBytes)
          .setMaxOutstandingElementCount(maxOutstandingMessages)
          .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
          .build());
    }

    com.google.cloud.pubsub.v1.Publisher.Builder builder =
        com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic)
            .setCredentialsProvider(gcpCredentialsProvider)
            .setBatchingSettings(batchingSettings.build())
            .setRetrySettings(
                RetrySettings.newBuilder()
                    // All values that are not configurable come from the defaults for the publisher
                    // client library.
                    .setTotalTimeout(Duration.ofMillis(maxTotalTimeoutMs))
                    .setMaxRpcTimeout(Duration.ofMillis(maxRequestTimeoutMs))
                    .setInitialRetryDelay(Duration.ofMillis(5))
                    .setRetryDelayMultiplier(2)
                    .setMaxRetryDelay(Duration.ofMillis(Long.MAX_VALUE))
                    .setInitialRpcTimeout(Duration.ofSeconds(10))
                    .setRpcTimeoutMultiplier(2)
                    .build())
            .setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()))
            .setEndpoint(cpsEndpoint);
    if (orderingKeySource != OrderingKeySource.NONE) {
      builder.setEnableMessageOrdering(true); 
    }
    try {
      publisher = builder.build();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }