in services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoter.java [95:216]
public GCSSnapshoterResponse execute(SnapshoterRequest request, Timestamp operationTs, String pubSubMessageId) throws IOException, NonRetryableApplicationException, InterruptedException {
// run common service start logging and checks
Utils.runServiceStartRoutines(
logger,
request,
persistentSet,
persistentSetObjectPrefix,
request.getTrackingId()
);
// validate required input
validateRequest(request);
// Perform the Snapshot operation using the BigQuery service
// time travel is calculated relative to the operation time
Tuple<TableSpec, Long> sourceTableWithTimeTravelTuple = Utils.getTableSpecWithTimeTravel(
request.getTargetTable(),
request.getBackupPolicyAndState().getTimeTravelOffsetDays(),
operationTs
);
// construct the backup folder for this run in the format project/dataset/table/trackingid/timetravelstamp
String backupFolder = String.format("%s/%s/%s/%s/%s/%s",
request.getTargetTable().getProject(),
request.getTargetTable().getDataset(),
request.getTargetTable().getTable(),
request.getTrackingId(),
sourceTableWithTimeTravelTuple.y(), // include the time travel millisecond for transparency
request.getBackupPolicyAndState().getGcsExportFormat()
);
String gcsDestinationUri = prepareGcsUriForMultiFileExport(
request.getBackupPolicyAndState().getGcsSnapshotStorageLocation(),
backupFolder
);
Timestamp timeTravelTs = Timestamp.ofTimeSecondsAndNanos(sourceTableWithTimeTravelTuple.y() / 1000, 0);
logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("Will take a GCS Snapshot for '%s' to '%s' with time travel timestamp '%s' (%s days)",
request.getTargetTable().toSqlString(),
gcsDestinationUri,
timeTravelTs,
request.getBackupPolicyAndState().getTimeTravelOffsetDays().getText()
)
);
if(!request.isDryRun()){
// create an async bq export job
String jobId = TrackingHelper.generateBQExportJobId(request.getTrackingId(), config.getApplicationName());
// We create the tagging request and added it to a persistent storage
// The Tagger service will receive notifications of export job completion via log sinks and pick up the tagger request from the persistent storage
// Make sure the file is stored first before running the export job. In case of non-fatal error of file creation and retry, we don't re-run the export job
TaggerRequest taggerRequest = new TaggerRequest(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
request.getBackupPolicyAndState(),
BackupMethod.GCS_SNAPSHOT,
null,
gcsDestinationUri,
operationTs
);
String taggerRequestFile = String.format("%s/%s", persistentMapObjectPrefix, jobId);
persistentMap.put(taggerRequestFile, taggerRequest.toJsonString());
Map<String, String> jobLabels = new HashMap<>();
// labels has to be max 63 chars, contain only lowercase letters, numeric characters, underscores, and dashes. All characters must use UTF-8 encoding, and international characters are allowed.
jobLabels.put("app", config.getApplicationName());
// API Call
bqService.exportToGCS(
jobId,
sourceTableWithTimeTravelTuple.x(),
gcsDestinationUri,
request.getBackupPolicyAndState().getGcsExportFormat(),
request.getBackupPolicyAndState().getGcsCsvDelimiter(),
request.getBackupPolicyAndState().getGcsCsvExportHeader(),
request.getBackupPolicyAndState().getGcsUseAvroLogicalTypes(),
request.getTrackingId(),
jobLabels
);
}
logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("BigQuery GCS export submitted for table %s to %s",
request.getTargetTable().toSqlString(),
gcsDestinationUri
)
);
// run common service end logging and adding pubsub message to processed list
Utils.runServiceEndRoutines(
logger,
request,
persistentSet,
persistentSetObjectPrefix,
request.getTrackingId()
);
return new GCSSnapshoterResponse(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
operationTs,
sourceTableWithTimeTravelTuple.x()
);
}