in firestore-incremental-capture-pipeline/src/main/java/com/pipeline/FirestoreHelpers.java [48:84]
public PCollection<RunQueryRequest> expand(PCollection<String> input) {
LOG.info(baseDocumentPath);
return input.apply(
ParDo.of(
new DoFn<String, RunQueryRequest>() {
@ProcessElement
public void processElement(ProcessContext c) {
final String collectionId = c.element();
if (collectionId.equals("*")) {
LOG.info("Querying all collections");
RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder()
.setParent(baseDocumentPath)
.setStructuredQuery(StructuredQuery.newBuilder().build())
.build();
c.output(runQueryRequest);
return;
}
CollectionSelector collection = CollectionSelector
.newBuilder()
.setCollectionId(collectionId)
.build();
RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder()
.setParent(baseDocumentPath)
.setStructuredQuery(
com.google.firestore.v1.StructuredQuery.newBuilder()
.addFrom(collection)
.build())
.build();
c.output(runQueryRequest);
}
}));
}