in services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/BigQuerySnapshoter.java [94:218]
public BigQuerySnapshoterResponse execute(SnapshoterRequest request, Timestamp operationTs, String pubSubMessageId) throws IOException, NonRetryableApplicationException, InterruptedException, RetryableApplicationException {
// run common service start logging and checks
Utils.runServiceStartRoutines(
logger,
request,
persistentSet,
persistentSetObjectPrefix,
request.getTrackingId()
);
// validate required params
validateInput(request);
// time travel is calculated relative to the operation time
Tuple<TableSpec, Long> sourceTableWithTimeTravelTuple = Utils.getTableSpecWithTimeTravel(
request.getTargetTable(),
request.getBackupPolicyAndState().getTimeTravelOffsetDays(),
operationTs
);
// expiry date is calculated relative to the operation time
Timestamp expiryTs = Timestamp.ofTimeSecondsAndNanos(
operationTs.getSeconds() + (request.getBackupPolicyAndState().getBigQuerySnapshotExpirationDays().longValue() * Utils.SECONDS_IN_DAY),
operationTs.getNanos());
// construct the snapshot table from the request params and calculated timetravel
TableSpec snapshotTable = getSnapshotTableSpec(
request.getTargetTable(),
request.getBackupPolicyAndState().getBackupStorageProject(),
request.getBackupPolicyAndState().getBigQuerySnapshotStorageDataset(),
request.getRunId(),
sourceTableWithTimeTravelTuple.y()
);
Timestamp timeTravelTs = Timestamp.ofTimeSecondsAndNanos(sourceTableWithTimeTravelTuple.y()/1000, 0);
logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("Will take a BQ Snapshot for '%s' to '%s' with time travel timestamp '%s' (%s days) expiring on '%s'",
request.getTargetTable().toSqlString(),
snapshotTable.toSqlString(),
timeTravelTs.toString(),
request.getBackupPolicyAndState().getTimeTravelOffsetDays().getText(),
expiryTs.toString()
)
);
if(!request.isDryRun()){
String jobId = TrackingHelper.generateBQSnapshotJobId(request.getTrackingId(), config.getApplicationName());
// API Call
bqService.createSnapshot(
jobId,
sourceTableWithTimeTravelTuple.x(),
snapshotTable,
expiryTs,
request.getTrackingId()
);
}
logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("BigQuery snapshot completed for table %s to %s",
request.getTargetTable().toSqlString(),
snapshotTable.toSqlString()
)
);
// Create a Tagger request and send it to the Tagger PubSub topic
TaggerRequest taggerRequest = new TaggerRequest(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
request.getBackupPolicyAndState(),
BackupMethod.BIGQUERY_SNAPSHOT,
snapshotTable,
null,
operationTs
);
// Publish the list of tagging requests to PubSub
PubSubPublishResults publishResults = pubSubService.publishTableOperationRequests(
config.getProjectId(),
config.getOutputTopic(),
Arrays.asList(taggerRequest)
);
for (FailedPubSubMessage msg : publishResults.getFailedMessages()) {
String logMsg = String.format("Failed to publish this message %s", msg.toString());
logger.logWarnWithTracker(request.isDryRun(),request.getTrackingId(), request.getTargetTable(), logMsg);
}
for (SuccessPubSubMessage msg : publishResults.getSuccessMessages()) {
String logMsg = String.format("Published this message %s", msg.toString());
logger.logInfoWithTracker(request.isDryRun(),request.getTrackingId(), request.getTargetTable(), logMsg);
}
// run common service end logging and adding pubsub message to processed list
Utils.runServiceEndRoutines(
logger,
request,
persistentSet,
persistentSetObjectPrefix,
request.getTrackingId()
);
return new BigQuerySnapshoterResponse(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
operationTs,
sourceTableWithTimeTravelTuple.x(),
snapshotTable,
taggerRequest,
publishResults
);
}