private static RecordAppender getRecordAppender()

in kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/KafkaMetadataTransform.java [128:228]


  private static RecordAppender getRecordAppender(SimpleConfig config) {
    RecordAppender externalFieldAppender;
    String metadataFieldName = config.getString(KEY_METADATA_FIELD_NAME);
    Boolean nestedMetadata = config.getBoolean(KEY_METADATA_IS_NESTED);

    String topicFieldName;
    String partitionFieldName;
    String offsetFieldName;
    String timestampFieldName;

    if (nestedMetadata) {
      externalFieldAppender =
          getExternalFieldAppender(config.getString(EXTERNAL_KAFKA_METADATA), name -> name);

      SchemaBuilder nestedSchemaBuilder = SchemaBuilder.struct();
      nestedSchemaBuilder
          .field(TOPIC, Schema.STRING_SCHEMA)
          .field(PARTITION, Schema.INT32_SCHEMA)
          .field(OFFSET, Schema.INT64_SCHEMA)
          .field(TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA);
      externalFieldAppender.addToSchema(nestedSchemaBuilder);

      Schema nestedSchema = nestedSchemaBuilder.build();

      return new RecordAppender() {
        @Override
        public void addToSchema(SchemaBuilder builder) {
          builder.field(metadataFieldName, nestedSchema);
        }

        @Override
        public void addToStruct(SinkRecord record, Struct struct) {
          Struct nested = new Struct(nestedSchema);
          nested.put(TOPIC, record.topic());
          nested.put(PARTITION, record.kafkaPartition());
          nested.put(OFFSET, record.kafkaOffset());
          if (record.timestamp() != null) {
            nested.put(TIMESTAMP, record.timestamp());
          }
          externalFieldAppender.addToStruct(record, nested);
          struct.put(metadataFieldName, nested);
        }

        @Override
        public void addToMap(SinkRecord record, Map<String, Object> map) {
          Map<String, Object> nested = Maps.newHashMap();
          nested.put(TOPIC, record.topic());
          nested.put(PARTITION, record.kafkaPartition());
          nested.put(OFFSET, record.kafkaOffset());
          if (record.timestamp() != null) {
            nested.put(TIMESTAMP, record.timestamp());
          }
          externalFieldAppender.addToMap(record, nested);
          map.put(metadataFieldName, nested);
        }
      };

    } else {
      Function<String, String> namer = name -> String.format("%s_%s", metadataFieldName, name);
      topicFieldName = namer.apply(TOPIC);
      partitionFieldName = namer.apply(PARTITION);
      offsetFieldName = namer.apply(OFFSET);
      timestampFieldName = namer.apply(TIMESTAMP);

      externalFieldAppender =
          getExternalFieldAppender(config.getString(EXTERNAL_KAFKA_METADATA), namer);
      return new RecordAppender() {
        @Override
        public void addToSchema(SchemaBuilder builder) {
          builder
              .field(topicFieldName, Schema.STRING_SCHEMA)
              .field(partitionFieldName, Schema.INT32_SCHEMA)
              .field(offsetFieldName, Schema.INT64_SCHEMA)
              .field(timestampFieldName, Schema.OPTIONAL_INT64_SCHEMA);
          externalFieldAppender.addToSchema(builder);
        }

        @Override
        public void addToStruct(SinkRecord record, Struct struct) {
          struct.put(topicFieldName, record.topic());
          struct.put(partitionFieldName, record.kafkaPartition());
          struct.put(offsetFieldName, record.kafkaOffset());
          if (record.timestamp() != null) {
            struct.put(timestampFieldName, record.timestamp());
          }
          externalFieldAppender.addToStruct(record, struct);
        }

        @Override
        public void addToMap(SinkRecord record, Map<String, Object> map) {
          map.put(topicFieldName, record.topic());
          map.put(partitionFieldName, record.kafkaPartition());
          map.put(offsetFieldName, record.kafkaOffset());
          if (record.timestamp() != null) {
            map.put(timestampFieldName, record.timestamp());
          }
          externalFieldAppender.addToMap(record, map);
        }
      };
    }
  }