public Read fromConfigRow()

in sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java [236:443]


    public Read<?, ?> fromConfigRow(Row configRow, PipelineOptions options) {
      String updateCompatibilityBeamVersion =
          options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
      // We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
      // is not correctly passed in for pipelines that use Beam 2.55.0.
      // This is fixed for Beam 2.56.0 and later.
      updateCompatibilityBeamVersion =
          (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.55.0";
      try {
        Read<?, ?> transform = KafkaIO.read();

        Map<String, byte[]> consumerConfig = configRow.getMap("consumer_config");
        if (consumerConfig != null) {
          Map<String, Object> updatedConsumerConfig = new HashMap<>();
          consumerConfig.forEach(
              (key, dataBytes) -> {
                // Adding all allowed properties.
                if (!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
                  if (consumerConfig.get(key) == null) {
                    throw new IllegalArgumentException(
                        "Encoded value of the consumer config property " + key + " was null");
                  }
                  try {
                    updatedConsumerConfig.put(key, fromByteArray(consumerConfig.get(key)));
                  } catch (InvalidClassException e) {
                    throw new RuntimeException(e);
                  }
                }
              });
          transform = transform.withConsumerConfigUpdates(updatedConsumerConfig);
        }
        Collection<String> topics = configRow.getArray("topics");
        if (topics != null) {
          transform = transform.withTopics(new ArrayList<>(topics));
        }
        Collection<Row> topicPartitionRows = configRow.getArray("topic_partitions");
        if (topicPartitionRows != null && !topicPartitionRows.isEmpty()) {
          Collection<TopicPartition> topicPartitions =
              topicPartitionRows.stream()
                  .map(
                      row -> {
                        String topic = row.getString("topic");
                        if (topic == null) {
                          throw new IllegalArgumentException("Expected the topic to be not null");
                        }
                        Integer partition = row.getInt32("partition");
                        if (partition == null) {
                          throw new IllegalArgumentException(
                              "Expected the partition to be not null");
                        }
                        return new TopicPartition(topic, partition);
                      })
                  .collect(Collectors.toList());
          transform = transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
        }
        String topicPattern = configRow.getString("topic_pattern");
        if (topicPattern != null) {
          transform = transform.withTopicPattern(topicPattern);
        }

        byte[] keyDeserializerProvider = configRow.getBytes("key_deserializer_provider");
        if (keyDeserializerProvider != null) {

          byte[] keyCoder = configRow.getBytes("key_coder");
          if (keyCoder != null) {
            transform =
                transform.withKeyDeserializerProviderAndCoder(
                    (DeserializerProvider) fromByteArray(keyDeserializerProvider),
                    (org.apache.beam.sdk.coders.Coder) fromByteArray(keyCoder));
          } else {
            transform =
                transform.withKeyDeserializer(
                    (DeserializerProvider) fromByteArray(keyDeserializerProvider));
          }
        }

        byte[] valueDeserializerProvider = configRow.getBytes("value_deserializer_provider");
        if (valueDeserializerProvider != null) {
          byte[] valueCoder = configRow.getBytes("value_coder");
          if (valueCoder != null) {
            transform =
                transform.withValueDeserializerProviderAndCoder(
                    (DeserializerProvider) fromByteArray(valueDeserializerProvider),
                    (org.apache.beam.sdk.coders.Coder) fromByteArray(valueCoder));
          } else {
            transform =
                transform.withValueDeserializer(
                    (DeserializerProvider) fromByteArray(valueDeserializerProvider));
          }
        }

        byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
        if (consumerFactoryFn != null) {
          transform =
              transform.withConsumerFactoryFn(
                  (SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>)
                      fromByteArray(consumerFactoryFn));
        }
        byte[] watermarkFn = configRow.getBytes("watermark_fn");
        if (watermarkFn != null) {
          transform = transform.withWatermarkFn2((SerializableFunction) fromByteArray(watermarkFn));
        }
        Long maxNumRecords = configRow.getInt64("max_num_records");
        if (maxNumRecords != null) {
          transform = transform.withMaxNumRecords(maxNumRecords);
        }

        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.58.0") >= 0) {
          Boolean isRedistributed = configRow.getBoolean("redistribute");
          if (isRedistributed != null && isRedistributed) {
            transform = transform.withRedistribute();
            Integer redistributeNumKeys =
                configRow.getValue("redistribute_num_keys") == null
                    ? Integer.valueOf(0)
                    : configRow.getInt32("redistribute_num_keys");
            if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) {
              transform = transform.withRedistributeNumKeys(redistributeNumKeys);
            }
            Boolean allowDuplicates = configRow.getBoolean("allows_duplicates");
            if (allowDuplicates != null && allowDuplicates) {
              transform = transform.withAllowDuplicates(allowDuplicates);
            }
          }
        }
        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.63.0") >= 0) {
          @Nullable Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
          if (offsetDeduplication != null) {
            transform = transform.withOffsetDeduplication(offsetDeduplication);
          }
        }
        Duration maxReadTime = configRow.getValue("max_read_time");
        if (maxReadTime != null) {
          transform =
              transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
        }
        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") < 0) {
          // set to current default
          transform = transform.withConsumerPollingTimeout(2L);
        } else {
          Long consumerPollingTimeout = configRow.getInt64("consumer_polling_timeout");
          if (consumerPollingTimeout != null) {
            transform = transform.withConsumerPollingTimeout(consumerPollingTimeout);
          }
        }
        Instant startReadTime = configRow.getValue("start_read_time");
        if (startReadTime != null) {
          transform = transform.withStartReadTime(startReadTime);
        }
        Instant stopReadTime = configRow.getValue("stop_read_time");
        if (stopReadTime != null) {
          transform = transform.withStopReadTime(stopReadTime);
        }
        Boolean isCommitOffsetFinalizeEnabled =
            configRow.getBoolean("is_commit_offset_finalize_enabled");
        if (isCommitOffsetFinalizeEnabled != null && isCommitOffsetFinalizeEnabled) {
          transform = transform.commitOffsetsInFinalize();
        }
        Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
        if (isDynamicRead != null && isDynamicRead) {
          Duration watchTopicPartitionDuration =
              configRow.getValue("watch_topic_partition_duration");
          if (watchTopicPartitionDuration == null) {
            throw new IllegalArgumentException(
                "Expected watchTopicPartitionDuration to be available when isDynamicRead is set to true");
          }
          transform =
              transform.withDynamicRead(
                  org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
        }

        byte[] timestampPolicyFactory = configRow.getBytes("timestamp_policy_factory");
        if (timestampPolicyFactory != null) {
          transform =
              transform.withTimestampPolicyFactory(
                  (TimestampPolicyFactory) fromByteArray(timestampPolicyFactory));
        }
        Map<String, byte[]> offsetConsumerConfig = configRow.getMap("offset_consumer_config");
        if (offsetConsumerConfig != null) {
          Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
          offsetConsumerConfig.forEach(
              (key, dataBytes) -> {
                if (offsetConsumerConfig.get(key) == null) {
                  throw new IllegalArgumentException(
                      "Encoded value for the offset consumer config key " + key + " was null.");
                }
                try {
                  updatedOffsetConsumerConfig.put(
                      key, fromByteArray(offsetConsumerConfig.get(key)));
                } catch (InvalidClassException e) {
                  throw new RuntimeException(e);
                }
              });
          transform = transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
        }

        byte[] checkStopReadinfFn = configRow.getBytes("check_stop_reading_fn");
        if (checkStopReadinfFn != null) {
          transform =
              transform.withCheckStopReadingFn(
                  (SerializableFunction<TopicPartition, Boolean>)
                      fromByteArray(checkStopReadinfFn));
        }

        return transform;
      } catch (InvalidClassException e) {
        throw new RuntimeException(e);
      }
    }