public Map execute()

in services/library/src/main/java/com/google/cloud/pso/bq_pii_classifier/functions/tagger/Tagger.java [85:186]


    public Map<String, String> execute(
            Operation request,
            String lookUpKey,
            String pubSubMessageId
    ) throws IOException, InterruptedException, NonRetryableApplicationException {

        logger.logFunctionStart(request.getTrackingId());
        logger.logInfoWithTracker(request.getTrackingId(),
                String.format("Request : %s", request.toString()));

        /**
         *  Check if we already processed this pubSubMessageId before to avoid submitting BQ queries
         *  in case we have unexpected errors with PubSub re-sending the message. This is an extra measure to avoid unnecessary cost.
         *  We do that by keeping simple flag files in GCS with the pubSubMessageId as file name.
         */
        String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId);
        if (persistentSet.contains(flagFileName)) {
            // log error and ACK and return
            String msg = String.format("PubSub message ID '%s' has been processed before by the Tagger. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.",
                    pubSubMessageId);
            throw new NonRetryableApplicationException(msg);
        }

        // Query DLP results in BQ and return a dict of bq_column=policy_tag

        // lookup key is the DLP "jobId" in case of Standard Mode (to use the clustered column)
        // and "project.dataset.table" in case of Auto DLP (as there are no jobIds in that case)

        TablePolicyTags tablePolicyTags = findingsReader.getFieldsToPolicyTagsMap(lookUpKey);

        // TODO: do we really need appliedFieldsToPolicyTags or can we just return tablePolicyTags.getFieldsPolicyTags()
        Map<String, String> appliedFieldsToPolicyTags = new HashMap<>();
        // if we have DLP findings for this table or dlpJob
        if (tablePolicyTags != null) {

            Map<String, PolicyTagInfo> computedFieldsToPolicyTagsMap = tablePolicyTags.getFieldsPolicyTags();
            TableSpec targetTableSpec = tablePolicyTags.getTableSpec();

            // AutoDLP mode and re-tagging runs in Standard mode could potentially contain tables that were deleted.
            // For that we need to check if the table exists before attempting to apply tags
            // When a table is not found a com.google.api.client.googleapis.json.GoogleJsonResponseException
            if (bqService.tableExists(targetTableSpec)) {

                logger.logInfoWithTracker(request.getTrackingId(),
                        String.format("Computed Fields to Policy Tags mapping : %s",
                                computedFieldsToPolicyTagsMap.toString()));

                // construct a map of table labels based on the common labels and info type labels
                Map<String, String> tableLabels = generateTableLabelsFromDlpFindings(tablePolicyTags,
                        config.getInfoTypeMap());

                //log found labels for this table
                for (Map.Entry<String, String> labelEntry : tableLabels.entrySet()) {
                    logger.logLabelsHistory(targetTableSpec,
                            labelEntry.getKey(),
                            labelEntry.getValue(),
                            config.isDryRunLabels(),
                            request.getTrackingId());
                }

                // Apply policy tags to columns in BigQuery
                // If isDryRun = True no actual tagging will happen on BigQuery and Dry-Run log entries will be written instead
                List<TableFieldSchema> updatedFields = applyPolicyTagsAndLabels(
                        targetTableSpec,
                        computedFieldsToPolicyTagsMap,
                        tableLabels,
                        config.getAppOwnedTaxonomies(),
                        config.isDryRunTags(),
                        config.isDryRunLabels(),
                        request.getTrackingId());

                appliedFieldsToPolicyTags = mapFieldsToPolicyTags(updatedFields);

            } else {
                // if the table doesn't exist anymore on BigQuery
                logger.logWarnWithTracker(request.getTrackingId(),
                        String.format(
                                "Table %s doesn't exist anymore in BigQuery and no tagging could be applied",
                                targetTableSpec.toSqlString()
                        ));
            }


        } else {
            // if we don't have DLP findings for this table or dlpJob
            logger.logInfoWithTracker(request.getTrackingId(),
                    String.format(
                            "No DLP InfoTypes or mapped policy tags are found for lookUpKey '%s'",
                            lookUpKey
                    ));
        }

        // Add a flag key marking that we already completed this request and no additional runs
        // are required in case PubSub is in a loop of retrying due to ACK timeout while the service has already processed the request
        // This is an extra measure to avoid unnecessary BigQuery cost due to config issues.
        logger.logInfoWithTracker(request.getTrackingId(), String.format("Persisting processing key for PubSub message ID %s", pubSubMessageId));
        persistentSet.add(flagFileName);

        logger.logFunctionEnd(request.getTrackingId());

        return appliedFieldsToPolicyTags;
    }