in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java [94:182]
public void start(Map<String, String> props) {
Map<String, Object> validatedProps = new CloudPubSubSourceConnector().config().parse(props);
cpsSubscription = ProjectSubscriptionName.newBuilder()
.setProject(validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString())
.setSubscription(
validatedProps.get(CloudPubSubSourceConnector.CPS_SUBSCRIPTION_CONFIG).toString())
.build();
String cpsEndpoint = (String) validatedProps.get(ConnectorUtils.CPS_ENDPOINT);
kafkaTopic = validatedProps.get(CloudPubSubSourceConnector.KAFKA_TOPIC_CONFIG).toString();
int cpsMaxBatchSize = (Integer) validatedProps
.get(CloudPubSubSourceConnector.CPS_MAX_BATCH_SIZE_CONFIG);
kafkaPartitions =
(Integer) validatedProps.get(CloudPubSubSourceConnector.KAFKA_PARTITIONS_CONFIG);
kafkaMessageKeyAttribute =
(String) validatedProps.get(CloudPubSubSourceConnector.KAFKA_MESSAGE_KEY_CONFIG);
kafkaMessageTimestampAttribute =
(String) validatedProps.get(CloudPubSubSourceConnector.KAFKA_MESSAGE_TIMESTAMP_CONFIG);
kafkaPartitionScheme =
PartitionScheme.getEnum(
(String) validatedProps.get(CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG));
useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS);
makeOrderingKeyAttribute =
(Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_MAKE_ORDERING_KEY_ATTRIBUTE);
ConnectorCredentialsProvider gcpCredentialsProvider = new ConnectorCredentialsProvider();
String gcpCredentialsFilePath = (String) validatedProps
.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = (String) validatedProps
.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
boolean useStreamingPull = (Boolean) validatedProps
.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_ENABLED);
long streamingPullBytes = (Long) validatedProps
.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_FLOW_CONTROL_BYTES);
long streamingPullMessages = (Long) validatedProps
.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES);
int streamingPullParallelStreams = (Integer) validatedProps
.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_PARALLEL_STREAMS);
long streamingPullMaxAckDeadlineMs = (Long) validatedProps
.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_ACK_EXTENSION_MS);
long streamingPullMaxMsPerAckDeadlineExtension = (Long) validatedProps
.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION);
if (gcpCredentialsFilePath != null) {
try {
gcpCredentialsProvider.loadFromFile(gcpCredentialsFilePath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
gcpCredentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// Only do this if we did not set it through the constructor.
if (subscriber == null) {
if (useStreamingPull) {
subscriber = new StreamingPullSubscriber(
receiver -> {
Subscriber.Builder builder = Subscriber.newBuilder(cpsSubscription, receiver)
.setCredentialsProvider(gcpCredentialsProvider)
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.setMaxOutstandingElementCount(streamingPullMessages)
.setMaxOutstandingRequestBytes(streamingPullBytes).build())
.setParallelPullCount(streamingPullParallelStreams)
.setEndpoint(cpsEndpoint)
.setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()));
if (streamingPullMaxAckDeadlineMs > 0) {
builder.setMaxAckExtensionPeriod(Duration.ofMillis(streamingPullMaxAckDeadlineMs));
}
if (streamingPullMaxMsPerAckDeadlineExtension > 0) {
builder.setMaxDurationPerAckExtension(
Duration.ofMillis(streamingPullMaxMsPerAckDeadlineExtension));
}
return builder.build();
});
} else {
subscriber = new AckBatchingSubscriber(
new CloudPubSubRoundRobinSubscriber(NUM_CPS_SUBSCRIBERS,
gcpCredentialsProvider,
cpsEndpoint, cpsSubscription, cpsMaxBatchSize), runnable -> getSystemExecutor()
.scheduleAtFixedRate(runnable, 100, 100, TimeUnit.MILLISECONDS));
}
}
standardAttributes.add(kafkaMessageKeyAttribute);
standardAttributes.add(kafkaMessageTimestampAttribute);
log.info("Started a CloudPubSubSourceTask.");
}