public static void main()

in firestore-incremental-capture-pipeline/src/main/java/com/pipeline/RestorationPipeline.java [66:138]


  public static void main(String[] args) {

    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    String project = options.getProject();
    String collectionId = options.getFirestoreCollectionId();
    String secondaryDatabase = options.getFirestoreDb();
    String datasetId = options.getBigQueryDataset();
    String tableId = options.getBigQueryTable();
    String defaultDatabase = DEFAULT_FIRESTORE_OPTIONS.getDatabaseId();
    Instant readTime = Utils.adjustDate(Instant.ofEpochSecond(options.getTimestamp()));

    options.setFirestoreDb(secondaryDatabase);

    // Read from Firestore at the specified timestamp to form the baseline
    // The returned PCollection contains the documents at the specified timestamp in
    // Firestore
    PCollection<Document> documentsAtReadTime = pipeline
        .apply("Passing the collection ID " + collectionId, Create.of(collectionId))
        .apply("Prepare the PITR query", new FirestoreHelpers.RunQuery(project, defaultDatabase))
        .apply(
            FirestoreIO.v1()
                .read()
                .runQuery()
                .withReadTime(readTime)
                .build())
        .apply(new FirestoreHelpers.RunQueryResponseToDocument());

    // Write the documents to the secondary database
    documentsAtReadTime
        .apply("Create the write request",
            ParDo.of(new DoFn<Document, Write>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
                Document document = c.element();

                // Replace the default database with the secondary database in the document id
                String id = document.getName().replace(defaultDatabase, secondaryDatabase);

                Document newDocument = Document.newBuilder()
                    .setName(id)
                    .putAllFields(document.getFieldsMap())
                    .build();

                c.output(Write.newBuilder()
                    .setUpdate(newDocument)
                    .build());
              }
            }))
        .apply("Write to the Firestore database instance", FirestoreIO.v1().write().batchWrite().build());

    // BigQuery read and subsequent Firestore write
    pipeline
        .apply(Create.of(""))
        .apply("Read from BigQuery",
            new IncrementalCaptureLog(project, readTime, secondaryDatabase, datasetId, tableId))
        .apply("Prepare write operations",
            new FirestoreHelpers.DocumentToWrite(defaultDatabase, defaultDatabase))
        .apply("Write to the Firestore database instance (From BigQuery)",
            FirestoreIO.v1().write().batchWrite().build());

    PipelineResult result = pipeline.run();

    // We try to identify if the pipeline is being run or a template is being
    // created
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      // If template location is null, then, pipeline is being run, so we can wait
      // until finish
      result.waitUntilFinish();
    }
  }