public ResponseEntity receiveMessage()

in services/dispatcher-tagging-app/src/main/java/com/google/cloud/pso/bq_pii_classifier/dispatcher/TaggingDispatcherController.java [65:137]


    public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) {

        String runId = TrackingHelper.generateTaggingRunId();
        String state = "";

        try {

            if (requestBody == null || requestBody.getMessage() == null) {
                String msg = "Bad Request: invalid message format";
                logger.logSevereWithTracker(runId, msg);
                throw new NonRetryableApplicationException("Request body or message is Null.");
            }

            String requestJsonString = requestBody.getMessage().dataToUtf8String();

            // remove any escape characters (e.g. from Terraform
            requestJsonString = requestJsonString.replace("\\", "");

            logger.logInfoWithTracker(runId, String.format("Received payload: %s", requestJsonString));

            BigQueryScope bqScope = gson.fromJson(requestJsonString, BigQueryScope.class);

            logger.logInfoWithTracker(runId, String.format("Parsed JSON input %s ", bqScope.toString()));

            Scanner dlpResultsScanner;
            if (environment.getIsAutoDlpMode()){
                dlpResultsScanner = new AutoDlpResultsScannerImpl(
                        environment.getProjectId(),
                        environment.getSolutionDataset(),
                        environment.getDlpTableAuto(),
                        new BigQueryServiceImpl()
                );
            }else{
                dlpResultsScanner = new StandardDlpResultsScannerImpl(
                        environment.getProjectId(),
                        environment.getSolutionDataset(),
                        environment.getDlpTableStandard(),
                        environment.getLoggingTable(),
                        new BigQueryServiceImpl()
                );
            }

            Dispatcher dispatcher = new Dispatcher(
                    environment.toConfig(),
                    new BigQueryServiceImpl(),
                    new PubSubServiceImpl(),
                    dlpResultsScanner,
                    new GCSPersistentSetImpl(environment.getGcsFlagsBucket()),
                    "tagging-dispatcher-flags",
                    runId
            );

            PubSubPublishResults results = dispatcher.execute(bqScope, requestBody.getMessage().getMessageId());

            state = String.format("Publishing results: %s SUCCESS MESSAGES and %s FAILED MESSAGES",
                    results.getSuccessMessages().size(),
                    results.getFailedMessages().size());

            logger.logInfoWithTracker(runId, state);

        } catch (Exception e) {
            logger.logNonRetryableExceptions(runId, e);
            state = String.format("ERROR '%s'", e.getMessage());
        }

        // Always ACK the pubsub message to avoid retries
        // The dispatcher is the entry point and retrying it could cause
        // unnecessary runs and costs

        return new ResponseEntity(
                String.format("Process completed with state = %s", state),
                HttpStatus.OK);
    }