export async function launchJob()

in firestore-incremental-capture/functions/src/dataflow/trigger_dataflow_job.ts [26:71]


export async function launchJob(timestamp: number) {
  const projectId = config.projectId;
  const serverTimestamp = Timestamp.now().toMillis();
  const {syncCollectionPath} = config;

  const runId = `${config.instanceId}-dataflow-run-${serverTimestamp}`;

  logger.info(`Launching job ${runId}`, {
    labels: {run_id: runId},
  });

  const runDoc = admin.firestore().doc(`restore/${runId}`);

  // Extract the database name from the backup instance name
  const values = config.backupInstanceName.split('/');
  const firestoreDb = values[values.length - 1];

  /** Select the correct collection Id for apache beam */
  const firestoreCollectionId =
    syncCollectionPath === '{document=**}' ? '*' : syncCollectionPath;

  const [response] = await dataflowClient.launchFlexTemplate({
    projectId,
    location: config.location,
    launchParameter: {
      jobName: runId,
      parameters: {
        timestamp: timestamp.toString(),
        firestoreCollectionId,
        firestoreDb,
        bigQueryDataset: config.bqDataset,
        bigQueryTable: config.bqtable,
      },
      containerSpecGcsPath: `gs://${config.bucketName}/${config.instanceId}-dataflow-restore`,
    },
  });

  await runDoc.set({status: 'export triggered', runId: runId});

  logger.info(`Launched job named ${response.job?.name} successfully`, {
    job_response: response,
    labels: {run_id: runId},
  });

  return response;
}