in kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/KafkaMetadataTransform.java [128:228]
private static RecordAppender getRecordAppender(SimpleConfig config) {
RecordAppender externalFieldAppender;
String metadataFieldName = config.getString(KEY_METADATA_FIELD_NAME);
Boolean nestedMetadata = config.getBoolean(KEY_METADATA_IS_NESTED);
String topicFieldName;
String partitionFieldName;
String offsetFieldName;
String timestampFieldName;
if (nestedMetadata) {
externalFieldAppender =
getExternalFieldAppender(config.getString(EXTERNAL_KAFKA_METADATA), name -> name);
SchemaBuilder nestedSchemaBuilder = SchemaBuilder.struct();
nestedSchemaBuilder
.field(TOPIC, Schema.STRING_SCHEMA)
.field(PARTITION, Schema.INT32_SCHEMA)
.field(OFFSET, Schema.INT64_SCHEMA)
.field(TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA);
externalFieldAppender.addToSchema(nestedSchemaBuilder);
Schema nestedSchema = nestedSchemaBuilder.build();
return new RecordAppender() {
@Override
public void addToSchema(SchemaBuilder builder) {
builder.field(metadataFieldName, nestedSchema);
}
@Override
public void addToStruct(SinkRecord record, Struct struct) {
Struct nested = new Struct(nestedSchema);
nested.put(TOPIC, record.topic());
nested.put(PARTITION, record.kafkaPartition());
nested.put(OFFSET, record.kafkaOffset());
if (record.timestamp() != null) {
nested.put(TIMESTAMP, record.timestamp());
}
externalFieldAppender.addToStruct(record, nested);
struct.put(metadataFieldName, nested);
}
@Override
public void addToMap(SinkRecord record, Map<String, Object> map) {
Map<String, Object> nested = Maps.newHashMap();
nested.put(TOPIC, record.topic());
nested.put(PARTITION, record.kafkaPartition());
nested.put(OFFSET, record.kafkaOffset());
if (record.timestamp() != null) {
nested.put(TIMESTAMP, record.timestamp());
}
externalFieldAppender.addToMap(record, nested);
map.put(metadataFieldName, nested);
}
};
} else {
Function<String, String> namer = name -> String.format("%s_%s", metadataFieldName, name);
topicFieldName = namer.apply(TOPIC);
partitionFieldName = namer.apply(PARTITION);
offsetFieldName = namer.apply(OFFSET);
timestampFieldName = namer.apply(TIMESTAMP);
externalFieldAppender =
getExternalFieldAppender(config.getString(EXTERNAL_KAFKA_METADATA), namer);
return new RecordAppender() {
@Override
public void addToSchema(SchemaBuilder builder) {
builder
.field(topicFieldName, Schema.STRING_SCHEMA)
.field(partitionFieldName, Schema.INT32_SCHEMA)
.field(offsetFieldName, Schema.INT64_SCHEMA)
.field(timestampFieldName, Schema.OPTIONAL_INT64_SCHEMA);
externalFieldAppender.addToSchema(builder);
}
@Override
public void addToStruct(SinkRecord record, Struct struct) {
struct.put(topicFieldName, record.topic());
struct.put(partitionFieldName, record.kafkaPartition());
struct.put(offsetFieldName, record.kafkaOffset());
if (record.timestamp() != null) {
struct.put(timestampFieldName, record.timestamp());
}
externalFieldAppender.addToStruct(record, struct);
}
@Override
public void addToMap(SinkRecord record, Map<String, Object> map) {
map.put(topicFieldName, record.topic());
map.put(partitionFieldName, record.kafkaPartition());
map.put(offsetFieldName, record.kafkaOffset());
if (record.timestamp() != null) {
map.put(timestampFieldName, record.timestamp());
}
externalFieldAppender.addToMap(record, map);
}
};
}
}