public DlpJob execute()

in services/library/src/main/java/com/google/cloud/pso/bq_pii_classifier/functions/inspector/Inspector.java [72:134]


    public DlpJob execute(InspectorRequest request, String trackingId, String pubSubMessageId) throws IOException, NonRetryableApplicationException {

        logger.logFunctionStart(trackingId);
        logger.logInfoWithTracker(trackingId, String.format("Request : %s", request.toString()));

        /**
         *  Check if we already processed this pubSubMessageId before to avoid submitting BQ queries
         *  in case we have unexpected errors with PubSub re-sending the message. This is an extra measure to avoid unnecessary cost.
         *  We do that by keeping simple flag files in GCS with the pubSubMessageId as file name.
         */
        String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId);
        if(persistentSet.contains(flagFileName)){
            // log error and ACK and return
            String msg = String.format("PubSub message ID '%s' has been processed before by %s. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.",
                    pubSubMessageId,
                    this.getClass().getSimpleName()
                    );
            throw new NonRetryableApplicationException(msg);
        }

        // get Table Scan Limits config and Table size
        TableScanLimitsConfig tableScanLimitsConfig  = new TableScanLimitsConfig(
                config.getTableScanLimitsJsonConfig());

        logger.logInfoWithTracker(trackingId,
                String.format("TableScanLimitsConfig is %s", tableScanLimitsConfig.toString()));

        // DLP job config accepts Integer only for table scan limit. Must downcast
        // NumRows from BigInteger to Integer
        Integer tableNumRows = bqService.getTableNumRows(request.getTargetTable()).intValue();

        InspectJobConfig inspectJobConfig = createJob(
                request.getTargetTable(),
                tableScanLimitsConfig,
                tableNumRows,
                request.getInspectionTemplate()
        );

        CreateDlpJobRequest createDlpJobRequest = CreateDlpJobRequest.newBuilder()
                // Letters, numbers, hyphens, and underscores allowed.
                .setJobId(request.getTrackingId())
                // create the job in the host project, in the source data region to avoid network cost
                .setParent(LocationName.of(config.getProjectId(), request.getJobRegion()).toString())
                .setInspectJob(inspectJobConfig)
                .build();

        DlpJob submittedDlpJob = dlpService.submitJob(createDlpJobRequest);

        logger.logInfoWithTracker(trackingId, String.format("DLP job created successfully id='%s' for inspection template %s",
                submittedDlpJob.getName(),
                request.getInspectionTemplate()
        ));

        // Add a flag key marking that we already completed this request and no additional runs
        // are required in case PubSub is in a loop of retrying due to ACK timeout while the service has already processed the request
        // This is an extra measure to avoid unnecessary cost due to config issues.
        logger.logInfoWithTracker(trackingId, String.format("Persisting processing key for PubSub message ID %s", pubSubMessageId));
        persistentSet.add(flagFileName);

        logger.logFunctionEnd(trackingId);

        return submittedDlpJob;
    }