in sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java [236:443]
public Read<?, ?> fromConfigRow(Row configRow, PipelineOptions options) {
String updateCompatibilityBeamVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
// We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
// is not correctly passed in for pipelines that use Beam 2.55.0.
// This is fixed for Beam 2.56.0 and later.
updateCompatibilityBeamVersion =
(updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.55.0";
try {
Read<?, ?> transform = KafkaIO.read();
Map<String, byte[]> consumerConfig = configRow.getMap("consumer_config");
if (consumerConfig != null) {
Map<String, Object> updatedConsumerConfig = new HashMap<>();
consumerConfig.forEach(
(key, dataBytes) -> {
// Adding all allowed properties.
if (!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
if (consumerConfig.get(key) == null) {
throw new IllegalArgumentException(
"Encoded value of the consumer config property " + key + " was null");
}
try {
updatedConsumerConfig.put(key, fromByteArray(consumerConfig.get(key)));
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
}
});
transform = transform.withConsumerConfigUpdates(updatedConsumerConfig);
}
Collection<String> topics = configRow.getArray("topics");
if (topics != null) {
transform = transform.withTopics(new ArrayList<>(topics));
}
Collection<Row> topicPartitionRows = configRow.getArray("topic_partitions");
if (topicPartitionRows != null && !topicPartitionRows.isEmpty()) {
Collection<TopicPartition> topicPartitions =
topicPartitionRows.stream()
.map(
row -> {
String topic = row.getString("topic");
if (topic == null) {
throw new IllegalArgumentException("Expected the topic to be not null");
}
Integer partition = row.getInt32("partition");
if (partition == null) {
throw new IllegalArgumentException(
"Expected the partition to be not null");
}
return new TopicPartition(topic, partition);
})
.collect(Collectors.toList());
transform = transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
}
String topicPattern = configRow.getString("topic_pattern");
if (topicPattern != null) {
transform = transform.withTopicPattern(topicPattern);
}
byte[] keyDeserializerProvider = configRow.getBytes("key_deserializer_provider");
if (keyDeserializerProvider != null) {
byte[] keyCoder = configRow.getBytes("key_coder");
if (keyCoder != null) {
transform =
transform.withKeyDeserializerProviderAndCoder(
(DeserializerProvider) fromByteArray(keyDeserializerProvider),
(org.apache.beam.sdk.coders.Coder) fromByteArray(keyCoder));
} else {
transform =
transform.withKeyDeserializer(
(DeserializerProvider) fromByteArray(keyDeserializerProvider));
}
}
byte[] valueDeserializerProvider = configRow.getBytes("value_deserializer_provider");
if (valueDeserializerProvider != null) {
byte[] valueCoder = configRow.getBytes("value_coder");
if (valueCoder != null) {
transform =
transform.withValueDeserializerProviderAndCoder(
(DeserializerProvider) fromByteArray(valueDeserializerProvider),
(org.apache.beam.sdk.coders.Coder) fromByteArray(valueCoder));
} else {
transform =
transform.withValueDeserializer(
(DeserializerProvider) fromByteArray(valueDeserializerProvider));
}
}
byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
if (consumerFactoryFn != null) {
transform =
transform.withConsumerFactoryFn(
(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>)
fromByteArray(consumerFactoryFn));
}
byte[] watermarkFn = configRow.getBytes("watermark_fn");
if (watermarkFn != null) {
transform = transform.withWatermarkFn2((SerializableFunction) fromByteArray(watermarkFn));
}
Long maxNumRecords = configRow.getInt64("max_num_records");
if (maxNumRecords != null) {
transform = transform.withMaxNumRecords(maxNumRecords);
}
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.58.0") >= 0) {
Boolean isRedistributed = configRow.getBoolean("redistribute");
if (isRedistributed != null && isRedistributed) {
transform = transform.withRedistribute();
Integer redistributeNumKeys =
configRow.getValue("redistribute_num_keys") == null
? Integer.valueOf(0)
: configRow.getInt32("redistribute_num_keys");
if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) {
transform = transform.withRedistributeNumKeys(redistributeNumKeys);
}
Boolean allowDuplicates = configRow.getBoolean("allows_duplicates");
if (allowDuplicates != null && allowDuplicates) {
transform = transform.withAllowDuplicates(allowDuplicates);
}
}
}
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.63.0") >= 0) {
@Nullable Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
if (offsetDeduplication != null) {
transform = transform.withOffsetDeduplication(offsetDeduplication);
}
}
Duration maxReadTime = configRow.getValue("max_read_time");
if (maxReadTime != null) {
transform =
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
}
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") < 0) {
// set to current default
transform = transform.withConsumerPollingTimeout(2L);
} else {
Long consumerPollingTimeout = configRow.getInt64("consumer_polling_timeout");
if (consumerPollingTimeout != null) {
transform = transform.withConsumerPollingTimeout(consumerPollingTimeout);
}
}
Instant startReadTime = configRow.getValue("start_read_time");
if (startReadTime != null) {
transform = transform.withStartReadTime(startReadTime);
}
Instant stopReadTime = configRow.getValue("stop_read_time");
if (stopReadTime != null) {
transform = transform.withStopReadTime(stopReadTime);
}
Boolean isCommitOffsetFinalizeEnabled =
configRow.getBoolean("is_commit_offset_finalize_enabled");
if (isCommitOffsetFinalizeEnabled != null && isCommitOffsetFinalizeEnabled) {
transform = transform.commitOffsetsInFinalize();
}
Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
if (isDynamicRead != null && isDynamicRead) {
Duration watchTopicPartitionDuration =
configRow.getValue("watch_topic_partition_duration");
if (watchTopicPartitionDuration == null) {
throw new IllegalArgumentException(
"Expected watchTopicPartitionDuration to be available when isDynamicRead is set to true");
}
transform =
transform.withDynamicRead(
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
}
byte[] timestampPolicyFactory = configRow.getBytes("timestamp_policy_factory");
if (timestampPolicyFactory != null) {
transform =
transform.withTimestampPolicyFactory(
(TimestampPolicyFactory) fromByteArray(timestampPolicyFactory));
}
Map<String, byte[]> offsetConsumerConfig = configRow.getMap("offset_consumer_config");
if (offsetConsumerConfig != null) {
Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
offsetConsumerConfig.forEach(
(key, dataBytes) -> {
if (offsetConsumerConfig.get(key) == null) {
throw new IllegalArgumentException(
"Encoded value for the offset consumer config key " + key + " was null.");
}
try {
updatedOffsetConsumerConfig.put(
key, fromByteArray(offsetConsumerConfig.get(key)));
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
});
transform = transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
}
byte[] checkStopReadinfFn = configRow.getBytes("check_stop_reading_fn");
if (checkStopReadinfFn != null) {
transform =
transform.withCheckStopReadingFn(
(SerializableFunction<TopicPartition, Boolean>)
fromByteArray(checkStopReadinfFn));
}
return transform;
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
}