in sql-streaming-copier/src/main/java/com/google/cloud/pubsub/sql/providers/CloudPubsubProvider.java [30:51]
private static Row toPubsubRow(Row input) {
ImmutableMap.Builder<String, String> attributesBuilder = ImmutableMap.builder();
for (Object standardAttribute : input.getArray("attributes")) {
Row entryRow = (Row) standardAttribute;
String key = entryRow.getString("key");
ImmutableList.Builder<String> values = ImmutableList.builder();
for (Object value : entryRow.getArray("values")) {
byte[] bytes = (byte[]) value;
values.add(new String(bytes, UTF_8));
}
attributesBuilder.put(key, String.join("|", values.build()));
}
byte[] key = input.getBytes(Rows.MESSAGE_KEY_FIELD);
if (key.length != 0) {
attributesBuilder.put(ROW_MESSAGE_KEY_ATTRIBUTE, new String(key, UTF_8));
}
return Row.withSchema(SCHEMA)
.withFieldValue(Rows.PAYLOAD_FIELD, input.getBytes("payload"))
.withFieldValue(Rows.ATTRIBUTES_FIELD, attributesBuilder.build())
.withFieldValue(Rows.EVENT_TIMESTAMP_FIELD, input.getDateTime("event_timestamp"))
.build();
}