public ResponseEntity receiveMessage()

in services/tagger-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/tagger/TaggerController.java [68:179]


    public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) {

        String trackingId = TrackingHelper.MIN_RUN_ID;
        BackupPolicyService backupPolicyService = null;

        // These values will be updated based on the execution flow and logged at the end
        ResponseEntity responseEntity;
        TaggerRequest taggerRequest = null;
        TaggerResponse taggerResponse = null;
        boolean isSuccess;
        Exception error = null;
        boolean isRetryableError= false;

        try {

            if (requestBody == null || requestBody.getMessage() == null) {
                String msg = "Bad Request: invalid message format";
                logger.logSevereWithTracker(trackingId, null, msg);
                throw new NonRetryableApplicationException("Request body or message is Null.");
            }

            String requestJsonString = requestBody.getMessage().dataToUtf8String();

            logger.logInfoWithTracker(trackingId, null, String.format("Received payload: %s", requestJsonString));

            // The received pubsub message could have been sent by two different sources
            // 1. BigQuery Snapshoter: as a TaggerRequest JSON payload
            // 2. From a log sink listening for BQ export job completion events. These jobs are originally submitted by the GCS Snapshoter

            boolean isGCSExportJobMessage = isGCSExportJobMessage(requestJsonString);
            if(isGCSExportJobMessage){
                // parse the pubsub request as a BQ Export job completion notification

                String jobId = getGcsExportJobId(requestJsonString);
                String jobProjectId = getGcsExportJobProjectId(requestJsonString);
                trackingId = TrackingHelper.parseTrackingIdFromBQExportJobId(jobId);
                boolean isSuccessfulJob = isSuccessfulJob(requestJsonString);
                String jobError = getGcsExportJobError(requestJsonString);

                PersistentMap persistentMap = new GcsPersistentMapImpl(environment.getGcsFlagsBucket());
                String taggerRequestFile = String.format("%s/%s", "snapshoter-gcs-tagger-requests", jobId);
                String taggerRequestJson = persistentMap.get(taggerRequestFile);
                taggerRequest = gson.fromJson(taggerRequestJson, TaggerRequest.class);

                // After parsing the taggerRequest for tracking, throw a non retryable exception if the backup job failed
                if (!isSuccessfulJob){
                    String msg = String.format("GCS export job '%s' on project '%s' has failed with error `%s`. Please check the BigQuery logs in the backup project where the job ran.",
                            jobId,
                            jobProjectId,
                            jobError
                    );
                    throw new NonRetryableApplicationException(msg);
                }

            }else{
                // parse the pubsub request as a taggerRequest (from BQ Snapshoter)
                taggerRequest = gson.fromJson(requestJsonString, TaggerRequest.class);
            }

            trackingId = taggerRequest.getTrackingId();

            logger.logInfoWithTracker(taggerRequest.isDryRun(), trackingId, taggerRequest.getTargetTable(), String.format("Parsed Request: %s", taggerRequest.toString()));

            backupPolicyService = new BackupPolicyServiceGCSImpl(environment.getGcsBackupPoliciesBucket());
            Tagger tagger = new Tagger(
                    environment.toConfig(),
                    backupPolicyService,
                    new GCSPersistentSetImpl(environment.getGcsFlagsBucket()),
                    "tagger-flags",
                    functionNumber
            );

            taggerResponse = tagger.execute(
                    taggerRequest,
                    requestBody.getMessage().getMessageId()
            );

            responseEntity = new ResponseEntity("Process completed successfully.", HttpStatus.OK);
            isSuccess = true;
        } catch (Exception e) {

            Tuple<ResponseEntity, Boolean> handlingResults  = ControllerExceptionHelper.handleException(e,
                    logger,
                    trackingId,
                    taggerRequest == null? null: taggerRequest.getTargetTable()
                    );
            isSuccess = false;
            responseEntity = handlingResults.x();
            isRetryableError = handlingResults.y();
            error = e;

        }finally {
            if(backupPolicyService != null){
                backupPolicyService.shutdown();
            }
        }

        logger.logUnified(
                taggerRequest == null? null: taggerRequest.isDryRun(),
                functionNumber.toString(),
                taggerRequest == null? null: taggerRequest.getRunId(),
                taggerRequest == null? null: taggerRequest.getTrackingId(),
                taggerRequest == null? null : taggerRequest.getTargetTable(),
                taggerRequest,
                taggerResponse,
                isSuccess,
                error,
                isRetryableError
        );

        return responseEntity;
    }