public PCollection expand()

in firestore-incremental-capture-pipeline/src/main/java/com/pipeline/FirestoreHelpers.java [48:84]


    public PCollection<RunQueryRequest> expand(PCollection<String> input) {
      LOG.info(baseDocumentPath);
      return input.apply(
          ParDo.of(
              new DoFn<String, RunQueryRequest>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                  final String collectionId = c.element();

                  if (collectionId.equals("*")) {
                    LOG.info("Querying all collections");
                    RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder()
                        .setParent(baseDocumentPath)
                        .setStructuredQuery(StructuredQuery.newBuilder().build())
                        .build();

                    c.output(runQueryRequest);
                    return;
                  }

                  CollectionSelector collection = CollectionSelector
                      .newBuilder()
                      .setCollectionId(collectionId)
                      .build();

                  RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder()
                      .setParent(baseDocumentPath)
                      .setStructuredQuery(
                          com.google.firestore.v1.StructuredQuery.newBuilder()
                              .addFrom(collection)
                              .build())
                      .build();

                  c.output(runQueryRequest);
                }
              }));
    }