public PCollectionTuple expand()

in src/main/java/com/google/cloud/solutions/autotokenize/common/TransformingReader.java [184:211]


  public PCollectionTuple expand(PBegin pBegin) {
    checkNotNull(recordsTag(), "Provide a TupleTag for retrieving FlatRecord");

    PCollection<KV<FlatRecord, String>> recordsWithSchema =
        pBegin
            .apply(
                "Read" + SourceNames.forType(sourceType()).asCamelCase(), readAndConvertTransform())
            .setCoder(KvCoder.of(ProtoCoder.of(FlatRecord.class), StringUtf8Coder.of()));

    PCollection<FlatRecord> flatRecords =
        recordsWithSchema
            .apply("ExtractRecords", Keys.create())
            .setCoder(ProtoCoder.of(FlatRecord.class));

    var recordTuple = PCollectionTuple.of(recordsTag(), flatRecords);

    if (avroSchemaTag() != null) {
      PCollection<String> schema =
          recordsWithSchema
              .apply("ExtractSchema", Values.create())
              .setCoder(StringUtf8Coder.of())
              .apply(Sample.any(1));

      return recordTuple.and(avroSchemaTag(), schema);
    }

    return recordTuple;
  }