in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java [160:175]
private static Optional<DeadLetterPolicy> createDeadLetterPolicy(
SourceConfiguration configuration) {
if (configuration.contains(PULSAR_MAX_REDELIVER_COUNT)
|| configuration.contains(PULSAR_RETRY_LETTER_TOPIC)
|| configuration.contains(PULSAR_DEAD_LETTER_TOPIC)) {
DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
configuration.useOption(PULSAR_MAX_REDELIVER_COUNT, builder::maxRedeliverCount);
configuration.useOption(PULSAR_RETRY_LETTER_TOPIC, builder::retryLetterTopic);
configuration.useOption(PULSAR_DEAD_LETTER_TOPIC, builder::deadLetterTopic);
return Optional.of(builder.build());
} else {
return Optional.empty();
}
}