in firestore-incremental-capture-pipeline/src/main/java/com/pipeline/RestorationPipeline.java [66:138]
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline pipeline = Pipeline.create(options);
String project = options.getProject();
String collectionId = options.getFirestoreCollectionId();
String secondaryDatabase = options.getFirestoreDb();
String datasetId = options.getBigQueryDataset();
String tableId = options.getBigQueryTable();
String defaultDatabase = DEFAULT_FIRESTORE_OPTIONS.getDatabaseId();
Instant readTime = Utils.adjustDate(Instant.ofEpochSecond(options.getTimestamp()));
options.setFirestoreDb(secondaryDatabase);
// Read from Firestore at the specified timestamp to form the baseline
// The returned PCollection contains the documents at the specified timestamp in
// Firestore
PCollection<Document> documentsAtReadTime = pipeline
.apply("Passing the collection ID " + collectionId, Create.of(collectionId))
.apply("Prepare the PITR query", new FirestoreHelpers.RunQuery(project, defaultDatabase))
.apply(
FirestoreIO.v1()
.read()
.runQuery()
.withReadTime(readTime)
.build())
.apply(new FirestoreHelpers.RunQueryResponseToDocument());
// Write the documents to the secondary database
documentsAtReadTime
.apply("Create the write request",
ParDo.of(new DoFn<Document, Write>() {
@ProcessElement
public void processElement(ProcessContext c) {
Document document = c.element();
// Replace the default database with the secondary database in the document id
String id = document.getName().replace(defaultDatabase, secondaryDatabase);
Document newDocument = Document.newBuilder()
.setName(id)
.putAllFields(document.getFieldsMap())
.build();
c.output(Write.newBuilder()
.setUpdate(newDocument)
.build());
}
}))
.apply("Write to the Firestore database instance", FirestoreIO.v1().write().batchWrite().build());
// BigQuery read and subsequent Firestore write
pipeline
.apply(Create.of(""))
.apply("Read from BigQuery",
new IncrementalCaptureLog(project, readTime, secondaryDatabase, datasetId, tableId))
.apply("Prepare write operations",
new FirestoreHelpers.DocumentToWrite(defaultDatabase, defaultDatabase))
.apply("Write to the Firestore database instance (From BigQuery)",
FirestoreIO.v1().write().batchWrite().build());
PipelineResult result = pipeline.run();
// We try to identify if the pipeline is being run or a template is being
// created
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
// If template location is null, then, pipeline is being run, so we can wait
// until finish
result.waitUntilFinish();
}
}