public ResponseEntity receiveMessage()

in services/dispatcher-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/dispatcher/DispatcherController.java [67:132]


    public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) {

        String runId = TrackingHelper.MIN_RUN_ID;
        String state = "";
        // These values will be updated based on the execution flow and logged at the end

        try {

            if (requestBody == null || requestBody.getMessage() == null) {
                String msg = "Bad Request: invalid message format";
                logger.logSevereWithTracker(runId, null, msg);
                throw new NonRetryableApplicationException("Request body or message is Null.");
            }

            String requestJsonString = requestBody.getMessage().dataToUtf8String();

            // remove any escape characters (e.g. from Terraform
            requestJsonString = requestJsonString.replace("\\", "");

            logger.logInfoWithTracker(runId, null, String.format("Received payload: %s", requestJsonString));

            DispatcherRequest dispatcherRequest = gson.fromJson(requestJsonString, DispatcherRequest.class);

            if(dispatcherRequest.isDryRun()){
                runId = TrackingHelper.generateDryRunId();
            }else{
                if(dispatcherRequest.isForceRun()){
                    runId = TrackingHelper.generateForcedRunId();
                }else{
                    runId = TrackingHelper.generateHeartBeatRunId();
                }
            }

            logger.logInfoWithTracker(dispatcherRequest.isDryRun(), runId, null, String.format("Parsed dispatcher request %s ", dispatcherRequest.toString()));

            Dispatcher dispatcher = new Dispatcher(
                    environment.toConfig(),
                    new PubSubServiceImpl(),
                    new ResourceScannerImpl(),
                    new GCSPersistentSetImpl(environment.getGcsFlagsBucket()),
                    "dispatcher-flags",
                    functionNumber,
                    runId
            );

            PubSubPublishResults results = dispatcher.execute(dispatcherRequest, requestBody.getMessage().getMessageId());

            state = String.format("Publishing results: %s SUCCESS MESSAGES and %s FAILED MESSAGES",
                    results.getSuccessMessages().size(),
                    results.getFailedMessages().size());

            logger.logInfoWithTracker(dispatcherRequest.isDryRun(), runId, null, state);

        } catch (Exception e) {
            logger.logNonRetryableExceptions(runId, null, e);
            state = String.format("ERROR '%s'", e.getMessage());
        }

        // Always ACK the pubsub message to avoid retries
        // The dispatcher is the entry point and retrying it could cause
        // unnecessary runs and costs

        return new ResponseEntity(
                String.format("Process completed with state = %s", state),
                HttpStatus.OK);
    }