in src/main/java/com/google/cloud/solutions/autotokenize/common/TransformingReader.java [184:211]
public PCollectionTuple expand(PBegin pBegin) {
checkNotNull(recordsTag(), "Provide a TupleTag for retrieving FlatRecord");
PCollection<KV<FlatRecord, String>> recordsWithSchema =
pBegin
.apply(
"Read" + SourceNames.forType(sourceType()).asCamelCase(), readAndConvertTransform())
.setCoder(KvCoder.of(ProtoCoder.of(FlatRecord.class), StringUtf8Coder.of()));
PCollection<FlatRecord> flatRecords =
recordsWithSchema
.apply("ExtractRecords", Keys.create())
.setCoder(ProtoCoder.of(FlatRecord.class));
var recordTuple = PCollectionTuple.of(recordsTag(), flatRecords);
if (avroSchemaTag() != null) {
PCollection<String> schema =
recordsWithSchema
.apply("ExtractSchema", Values.create())
.setCoder(StringUtf8Coder.of())
.apply(Sample.any(1));
return recordTuple.and(avroSchemaTag(), schema);
}
return recordTuple;
}