public PubSubPublishResults execute()

in services/library/src/main/java/com/google/cloud/pso/bq_pii_classifier/functions/dispatcher/Dispatcher.java [78:167]


    public PubSubPublishResults execute(BigQueryScope bqScope, String pubSubMessageId) throws IOException, NonRetryableApplicationException, InterruptedException {

        /**
         *  Check if we already processed this pubSubMessageId before to avoid re-running the dispatcher (and the whole process)
         *  in case we have unexpected errors with PubSub re-sending the message. This is an extra measure to avoid unnecessary cost.
         *  We do that by keeping simple flag files in GCS with the pubSubMessageId as file name.
         */
        String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId);
        if (persistentSet.contains(flagFileName)) {
            // log error and ACK and return
            String msg = String.format("PubSub message ID '%s' has been processed before by the dispatcher. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.",
                    pubSubMessageId);
            throw new NonRetryableApplicationException(msg);
        } else {
            logger.logInfoWithTracker(runId, String.format("Persisting processing key for PubSub message ID %s", pubSubMessageId));
            persistentSet.add(flagFileName);
        }

        /**
         * Detecting which resources to tag is done bottom up DATASETS > PROJECTS where lower levels configs (e.g. dataset)
         * ignore higher level configs (e.g. Datasets)
         * For example:
         * If DATASETS_INCLUDE list is provided:
         *  * Tag only tables in these datasets
         *  * SKIP datasets in DATASETS_EXCLUDE
         *  * SKIP tables in TABLES_EXCLUDE
         *  * IGNORE all other INCLUDE lists
         * If PROJECTS_INCLUDE list is provided:
         *  * Tag only datasets and tables in these projects
         *  * SKIP datasets in DATASETS_EXCLUDE
         *  * SKIP tables in TABLES_EXCLUDE
         *  * IGNORE all other INCLUDE lists
         */

        // List down which tables to publish a Tagging request for based on the input scan scope and DLP results table

        List<JsonMessage> pubSubMessagesToPublish;

        if (!bqScope.getDatasetIncludeList().isEmpty()) {
            pubSubMessagesToPublish = processDatasets(
                    bqScope.getDatasetIncludeList(),
                    bqScope.getDatasetExcludeList(),
                    bqScope.getTableExcludeList());
        } else {
            if (!bqScope.getProjectIncludeList().isEmpty()) {
                pubSubMessagesToPublish = processProjects(
                        bqScope.getProjectIncludeList(),
                        bqScope.getDatasetExcludeList(),
                        bqScope.getTableExcludeList());
            } else {
                throw new NonRetryableApplicationException("At least one of of the following params must be not empty [tableIncludeList, datasetIncludeList, projectIncludeList]");
            }
        }

        // Publish the list of tagging requests to PubSub
        PubSubPublishResults publishResults = pubSubService.publishTableOperationRequests(
                config.getProjectId(),
                config.getOutputTopic(),
                pubSubMessagesToPublish
        );

        for (FailedPubSubMessage msg : publishResults.getFailedMessages()) {
            String logMsg = String.format("Failed to publish this messages %s", msg.toString());
            logger.logWarnWithTracker(runId, logMsg);
        }

        for (SuccessPubSubMessage msg : publishResults.getSuccessMessages()) {
            // this enable us to detect dispatched messages within a runId that fail in later stages (i.e. Tagger)

            // Log the dispatched tracking ID to be able to track the progress of this run
            if (config.getSolutionMode().equals(SolutionMode.STANDARD_DLP) && config.getDispatcherType().equals(DispatcherType.INSPECTION)) {
                InspectorRequest request = ((InspectorRequest) msg.getMsg());
                logger.logSuccessDispatcherTrackingId(runId, request.getTrackingId(), request.getTargetTable());
            } else {
                if (config.getSolutionMode().equals(SolutionMode.AUTO_DLP)) {
                    TaggerTableSpecRequest request = ((TaggerTableSpecRequest) msg.getMsg());
                    logger.logSuccessDispatcherTrackingId(runId, request.getTrackingId(), request.getTargetTable());
                } else {
                    if (config.getSolutionMode().equals(SolutionMode.STANDARD_DLP) && config.getDispatcherType().equals(DispatcherType.TAGGING)) {
                        TaggerDlpJobRequest request = ((TaggerDlpJobRequest) msg.getMsg());
                        logger.logSuccessDispatcherTrackingId(runId, request.getTrackingId());
                    }
                }
            }
        }

        logger.logFunctionEnd(runId);

        return publishResults;
    }