in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java [93:158]
public static <T> ConsumerBuilder<T> createConsumerBuilder(
PulsarClient client, Schema<T> schema, SourceConfiguration configuration) {
ConsumerBuilder<T> builder = new PulsarConsumerBuilder<>(client, schema);
configuration.useOption(PULSAR_SUBSCRIPTION_NAME, builder::subscriptionName);
configuration.useOption(
PULSAR_ACK_TIMEOUT_MILLIS, v -> builder.ackTimeout(v, MILLISECONDS));
configuration.useOption(PULSAR_ACK_RECEIPT_ENABLED, builder::isAckReceiptEnabled);
configuration.useOption(
PULSAR_TICK_DURATION_MILLIS, v -> builder.ackTimeoutTickTime(v, MILLISECONDS));
configuration.useOption(
PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS,
v -> builder.negativeAckRedeliveryDelay(v, MICROSECONDS));
configuration.useOption(PULSAR_SUBSCRIPTION_MODE, builder::subscriptionMode);
configuration.useOption(PULSAR_SUBSCRIPTION_PROPERTIES, builder::subscriptionProperties);
configuration.useOption(PULSAR_CRYPTO_FAILURE_ACTION, builder::cryptoFailureAction);
configuration.useOption(PULSAR_RECEIVER_QUEUE_SIZE, builder::receiverQueueSize);
configuration.useOption(
PULSAR_ACKNOWLEDGEMENTS_GROUP_TIME_MICROS,
v -> builder.acknowledgmentGroupTime(v, MICROSECONDS));
configuration.useOption(
PULSAR_REPLICATE_SUBSCRIPTION_STATE, builder::replicateSubscriptionState);
configuration.useOption(
PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS,
builder::maxTotalReceiverQueueSizeAcrossPartitions);
configuration.useOption(
PULSAR_CONSUMER_NAME,
consumerName -> String.format(consumerName, UUID.randomUUID()),
builder::consumerName);
configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
configuration.useOption(PULSAR_RETRY_ENABLE, builder::enableRetry);
configuration.useOption(
PULSAR_MAX_PENDING_CHUNKED_MESSAGE, builder::maxPendingChunkedMessage);
configuration.useOption(
PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL,
builder::autoAckOldestChunkedMessageOnQueueFull);
configuration.useOption(
PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS,
v -> builder.expireTimeOfIncompleteChunkedMessage(v, MILLISECONDS));
configuration.useOption(PULSAR_POOL_MESSAGES, builder::poolMessages);
if (configuration.contains(PULSAR_MEMORY_LIMIT_BYTES)) {
// Force to scale the receiver queue size if the memory limit has been configured.
builder.autoScaledReceiverQueueSizeEnabled(true);
} else {
configuration.useOption(
PULSAR_AUTO_SCALED_RECEIVER_QUEUE_SIZE_ENABLED,
builder::autoScaledReceiverQueueSizeEnabled);
}
Map<String, String> properties = configuration.getProperties(PULSAR_CONSUMER_PROPERTIES);
if (!properties.isEmpty()) {
builder.properties(properties);
}
// We only use exclusive subscription.
builder.subscriptionType(SubscriptionType.Exclusive);
// Flink connector doesn't need any batch receiving behaviours.
// Disable the batch-receive timer for the Consumer instance.
builder.batchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);
return builder;
}