in ingestion-beam/src/main/java/com/mozilla/telemetry/io/Read.java [122:200]
public PCollection<PubsubMessage> expand(PBegin input) {
BigQueryIO.TypedRead<PubsubMessage> read = BigQueryIO
.read((SchemaAndRecord schemaAndRecord) -> {
TableSchema tableSchema = schemaAndRecord.getTableSchema();
GenericRecord record = schemaAndRecord.getRecord();
// We have to take care not to read additional bytes; see
// https://github.com/mozilla/gcp-ingestion/issues/1266
ByteBuffer byteBuffer = (ByteBuffer) record.get(FieldName.PAYLOAD);
final byte[] payload;
if (byteBuffer != null) {
payload = new byte[byteBuffer.limit()];
byteBuffer.get(payload);
} else {
payload = new byte[0];
}
// We populate attributes for all simple string and timestamp fields, which is complete
// for raw and error tables.
// Decoded payload tables also have a top-level nested "metadata" struct; we can mostly
// just drop this since the same metadata object is encoded in the payload, but we do
// parse out the document namespace, type, and version since those are necessary in the
// case of a Sink job that doesn't look at the payload but still may need those
// attributes in order to route to the correct destination.
Map<String, String> attributes = new HashMap<>();
tableSchema.getFields().stream() //
.filter(f -> !"REPEATED".equals(f.getMode())) //
.forEach(f -> {
Object value = record.get(f.getName());
if (value != null) {
switch (f.getType()) {
case "TIMESTAMP":
attributes.put(f.getName(), Time.epochMicrosToTimestamp((Long) value));
break;
case "STRING":
case "INTEGER":
case "INT64":
attributes.put(f.getName(), value.toString());
break;
case "RECORD":
case "STRUCT":
// The only struct we support is the top-level nested "metadata" and we
// extract only the attributes needed for destination routing.
GenericRecord metadata = (GenericRecord) value;
Arrays
.asList(Attribute.DOCUMENT_NAMESPACE, Attribute.DOCUMENT_TYPE,
Attribute.DOCUMENT_VERSION)
.forEach(v -> attributes.put(v, metadata.get(v).toString()));
break;
// Ignore any other types (only the payload BYTES field should hit this).
default:
break;
}
}
});
return new PubsubMessage(payload, attributes);
}) //
.withCoder(PubsubMessageWithAttributesCoder.of()) //
.withTemplateCompatibility() //
.withoutValidation() //
.withMethod(method.method);
switch (source) {
case TABLE:
read = read.from(tableSpec);
break;
default:
case QUERY:
read = read.fromQuery(tableSpec).usingStandardSql();
}
if (source == Source.TABLE && method == BigQueryReadMethod.storageapi) {
if (rowRestriction != null) {
read = read.withRowRestriction(rowRestriction);
}
if (selectedFields != null) {
read = read.withSelectedFields(selectedFields);
}
}
return input.apply(read);
}