in services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/Configurator.java [88:247]
public ConfiguratorResponse execute(ConfiguratorRequest request, String pubSubMessageId) throws IOException, NonRetryableApplicationException, InterruptedException {
// run common service start logging and checks
Utils.runServiceStartRoutines(
logger,
request,
persistentSet,
persistentSetObjectPrefix,
request.getTrackingId()
);
// 1. Find the backup policy of this table
Tuple<BackupPolicyAndState, String> backupPolicyTuple = getBackupPolicyAndState(request);
BackupPolicyAndState backupPolicy = backupPolicyTuple.x();
// 2a. Determine if we should take a backup at this run given the policy CRON expression
// if the table has been backed up before then check if we should backup at this run
boolean isBackupCronTime = isBackupCronTime(
request.getTargetTable(),
backupPolicy.getCron(),
request.getRefTimestamp(),
backupPolicy.getConfigSource(),
backupPolicy.getLastBackupAt(),
logger,
request.getTrackingId()
);
// 2b. Check if the table has been created before the desired time travel
Tuple<TableSpec, Long> sourceTableWithTimeTravelTuple = Utils.getTableSpecWithTimeTravel(
request.getTargetTable(),
backupPolicy.getTimeTravelOffsetDays(),
request.getRefTimestamp()
);
// API Call
Long tableCreationTime = bqService.getTableCreationTime(request.getTargetTable());
// check if the table is created after the time travel timestamp
boolean isTableCreatedBeforeTimeTravel = tableCreationTime < sourceTableWithTimeTravelTuple.y();
// To take a backup, the backup cron expression must match this run and the table has to be around for
// enough time to apply the desired time travel. Otherwise, skip the backup this run
boolean isBackupTime = isBackupTime(request.isForceRun(), isBackupCronTime, isTableCreatedBeforeTimeTravel);
logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("isBackupTime for this run is '%s'. Calculated based on isForceRun=%s, isBackupCronTime=%s, isTableCreatedBeforeTimeTravel=%s ",
isBackupTime, request.isForceRun(), isBackupCronTime, isTableCreatedBeforeTimeTravel)
);
// 3. Prepare and send the backup request(s) if required
SnapshoterRequest bqSnapshotRequest = null;
SnapshoterRequest gcsSnapshotRequest = null;
PubSubPublishResults bqSnapshotPublishResults = null;
PubSubPublishResults gcsSnapshotPublishResults = null;
if (isBackupTime) {
Tuple<SnapshoterRequest, SnapshoterRequest> snapshotRequestsTuple = prepareSnapshotRequests(
backupPolicy,
request
);
List<JsonMessage> bqSnapshotRequests = new ArrayList<>(1);
if (snapshotRequestsTuple.x() != null) {
bqSnapshotRequest = snapshotRequestsTuple.x();
bqSnapshotRequests.add(bqSnapshotRequest);
}
List<JsonMessage> gcsSnapshotRequests = new ArrayList<>(1);
if (snapshotRequestsTuple.y() != null) {
gcsSnapshotRequest = snapshotRequestsTuple.y();
gcsSnapshotRequests.add(gcsSnapshotRequest);
}
// Publish the list of bq snapshot requests to PubSub
bqSnapshotPublishResults = pubSubService.publishTableOperationRequests(
config.getProjectId(),
config.getBigQuerySnapshoterTopic(),
bqSnapshotRequests
);
// Publish the list of gcs snapshot requests to PubSub
gcsSnapshotPublishResults = pubSubService.publishTableOperationRequests(
config.getProjectId(),
config.getGcsSnapshoterTopic(),
gcsSnapshotRequests
);
if (!bqSnapshotPublishResults.getSuccessMessages().isEmpty()) {
logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("Published %s BigQuery Snapshot requests %s",
bqSnapshotPublishResults.getSuccessMessages().size(),
bqSnapshotPublishResults.getSuccessMessages())
);
}
if (!gcsSnapshotPublishResults.getSuccessMessages().isEmpty()) {
logger.logInfoWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("Published %s GCS Snapshot requests %s",
gcsSnapshotPublishResults.getSuccessMessages().size(),
gcsSnapshotPublishResults.getSuccessMessages())
);
}
if (!bqSnapshotPublishResults.getFailedMessages().isEmpty()) {
logger.logWarnWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("Failed to publish BigQuery Snapshot request %s", bqSnapshotPublishResults.getFailedMessages().toString())
);
}
if (!gcsSnapshotPublishResults.getFailedMessages().isEmpty()) {
logger.logWarnWithTracker(
request.isDryRun(),
request.getTrackingId(),
request.getTargetTable(),
String.format("Failed to publish GCS Snapshot request %s", gcsSnapshotPublishResults.getFailedMessages().toString())
);
}
}
// run common service end logging and adding pubsub message to processed list
Utils.runServiceEndRoutines(
logger,
request,
persistentSet,
persistentSetObjectPrefix,
request.getTrackingId()
);
return new ConfiguratorResponse(
request.getTargetTable(),
request.getRunId(),
request.getTrackingId(),
request.isDryRun(),
backupPolicy,
backupPolicyTuple.y(), // source of the backup policy
request.getRefTimestamp(),
isBackupCronTime,
isTableCreatedBeforeTimeTravel,
isBackupTime,
bqSnapshotRequest,
gcsSnapshotRequest,
bqSnapshotPublishResults,
gcsSnapshotPublishResults
);
}