in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java [277:322]
private SourceRecord createRecordWithStruct(
Map<String, String> messageAttributes,
Map<String, String> ack,
String key,
String orderingKey,
byte[] messageBytes,
Long timestamp) {
SchemaBuilder valueSchemaBuilder =
SchemaBuilder.struct()
.field(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD, Schema.BYTES_SCHEMA);
for (Entry<String, String> attribute :
messageAttributes.entrySet()) {
if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
valueSchemaBuilder.field(attribute.getKey(),
Schema.STRING_SCHEMA);
}
}
if (makeOrderingKeyAttribute && orderingKey != null && !orderingKey.isEmpty()) {
valueSchemaBuilder.field(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE, Schema.STRING_SCHEMA);
}
Schema valueSchema = valueSchemaBuilder.build();
Struct value =
new Struct(valueSchema)
.put(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
messageBytes);
for (Field field : valueSchema.fields()) {
if (field.name().equals(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE)) {
value.put(field.name(), orderingKey);
} else if (!field.name().equals(
ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD)) {
value.put(field.name(), messageAttributes.get(field.name()));
}
}
return new SourceRecord(
null,
ack,
kafkaTopic,
selectPartition(key, value, orderingKey),
Schema.OPTIONAL_STRING_SCHEMA,
key,
valueSchema,
value,
timestamp);
}