in services/tagger-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/tagger/TaggerController.java [68:179]
public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) {
String trackingId = TrackingHelper.MIN_RUN_ID;
BackupPolicyService backupPolicyService = null;
// These values will be updated based on the execution flow and logged at the end
ResponseEntity responseEntity;
TaggerRequest taggerRequest = null;
TaggerResponse taggerResponse = null;
boolean isSuccess;
Exception error = null;
boolean isRetryableError= false;
try {
if (requestBody == null || requestBody.getMessage() == null) {
String msg = "Bad Request: invalid message format";
logger.logSevereWithTracker(trackingId, null, msg);
throw new NonRetryableApplicationException("Request body or message is Null.");
}
String requestJsonString = requestBody.getMessage().dataToUtf8String();
logger.logInfoWithTracker(trackingId, null, String.format("Received payload: %s", requestJsonString));
// The received pubsub message could have been sent by two different sources
// 1. BigQuery Snapshoter: as a TaggerRequest JSON payload
// 2. From a log sink listening for BQ export job completion events. These jobs are originally submitted by the GCS Snapshoter
boolean isGCSExportJobMessage = isGCSExportJobMessage(requestJsonString);
if(isGCSExportJobMessage){
// parse the pubsub request as a BQ Export job completion notification
String jobId = getGcsExportJobId(requestJsonString);
String jobProjectId = getGcsExportJobProjectId(requestJsonString);
trackingId = TrackingHelper.parseTrackingIdFromBQExportJobId(jobId);
boolean isSuccessfulJob = isSuccessfulJob(requestJsonString);
String jobError = getGcsExportJobError(requestJsonString);
PersistentMap persistentMap = new GcsPersistentMapImpl(environment.getGcsFlagsBucket());
String taggerRequestFile = String.format("%s/%s", "snapshoter-gcs-tagger-requests", jobId);
String taggerRequestJson = persistentMap.get(taggerRequestFile);
taggerRequest = gson.fromJson(taggerRequestJson, TaggerRequest.class);
// After parsing the taggerRequest for tracking, throw a non retryable exception if the backup job failed
if (!isSuccessfulJob){
String msg = String.format("GCS export job '%s' on project '%s' has failed with error `%s`. Please check the BigQuery logs in the backup project where the job ran.",
jobId,
jobProjectId,
jobError
);
throw new NonRetryableApplicationException(msg);
}
}else{
// parse the pubsub request as a taggerRequest (from BQ Snapshoter)
taggerRequest = gson.fromJson(requestJsonString, TaggerRequest.class);
}
trackingId = taggerRequest.getTrackingId();
logger.logInfoWithTracker(taggerRequest.isDryRun(), trackingId, taggerRequest.getTargetTable(), String.format("Parsed Request: %s", taggerRequest.toString()));
backupPolicyService = new BackupPolicyServiceGCSImpl(environment.getGcsBackupPoliciesBucket());
Tagger tagger = new Tagger(
environment.toConfig(),
backupPolicyService,
new GCSPersistentSetImpl(environment.getGcsFlagsBucket()),
"tagger-flags",
functionNumber
);
taggerResponse = tagger.execute(
taggerRequest,
requestBody.getMessage().getMessageId()
);
responseEntity = new ResponseEntity("Process completed successfully.", HttpStatus.OK);
isSuccess = true;
} catch (Exception e) {
Tuple<ResponseEntity, Boolean> handlingResults = ControllerExceptionHelper.handleException(e,
logger,
trackingId,
taggerRequest == null? null: taggerRequest.getTargetTable()
);
isSuccess = false;
responseEntity = handlingResults.x();
isRetryableError = handlingResults.y();
error = e;
}finally {
if(backupPolicyService != null){
backupPolicyService.shutdown();
}
}
logger.logUnified(
taggerRequest == null? null: taggerRequest.isDryRun(),
functionNumber.toString(),
taggerRequest == null? null: taggerRequest.getRunId(),
taggerRequest == null? null: taggerRequest.getTrackingId(),
taggerRequest == null? null : taggerRequest.getTargetTable(),
taggerRequest,
taggerResponse,
isSuccess,
error,
isRetryableError
);
return responseEntity;
}