function numerical_stats()

in dlp/src/numerical_stats.php [51:170]


function numerical_stats(
    string $callingProjectId,
    string $dataProjectId,
    string $topicId,
    string $subscriptionId,
    string $datasetId,
    string $tableId,
    string $columnName
): void {
    // Instantiate a client.
    $dlp = new DlpServiceClient();
    $pubsub = new PubSubClient();
    $topic = $pubsub->topic($topicId);

    // Construct risk analysis config
    $columnField = (new FieldId())
        ->setName($columnName);

    $statsConfig = (new NumericalStatsConfig())
        ->setField($columnField);

    $privacyMetric = (new PrivacyMetric())
        ->setNumericalStatsConfig($statsConfig);

    // Construct items to be analyzed
    $bigqueryTable = (new BigQueryTable())
        ->setProjectId($dataProjectId)
        ->setDatasetId($datasetId)
        ->setTableId($tableId);

    // Construct the action to run when job completes
    $pubSubAction = (new PublishToPubSub())
        ->setTopic($topic->name());

    $action = (new Action())
        ->setPubSub($pubSubAction);

    // Construct risk analysis job config to run
    $riskJob = (new RiskAnalysisJobConfig())
        ->setPrivacyMetric($privacyMetric)
        ->setSourceTable($bigqueryTable)
        ->setActions([$action]);

    // Listen for job notifications via an existing topic/subscription.
    $subscription = $topic->subscription($subscriptionId);

    // Submit request
    $parent = "projects/$callingProjectId/locations/global";
    $createDlpJobRequest = (new CreateDlpJobRequest())
        ->setParent($parent)
        ->setRiskJob($riskJob);
    $job = $dlp->createDlpJob($createDlpJobRequest);

    // Poll Pub/Sub using exponential backoff until job finishes
    // Consider using an asynchronous execution model such as Cloud Functions
    $attempt = 1;
    $startTime = time();
    do {
        foreach ($subscription->pull() as $message) {
            if (
                isset($message->attributes()['DlpJobName']) &&
                $message->attributes()['DlpJobName'] === $job->getName()
            ) {
                $subscription->acknowledge($message);
                // Get the updated job. Loop to avoid race condition with DLP API.
                do {
                    $getDlpJobRequest = (new GetDlpJobRequest())
                        ->setName($job->getName());
                    $job = $dlp->getDlpJob($getDlpJobRequest);
                } while ($job->getState() == JobState::RUNNING);
                break 2; // break from parent do while
            }
        }
        print('Waiting for job to complete' . PHP_EOL);
        // Exponential backoff with max delay of 60 seconds
        sleep(min(60, pow(2, ++$attempt)));
    } while (time() - $startTime < 600); // 10 minute timeout

    // Helper function to convert Protobuf values to strings
    $valueToString = function ($value) {
        $json = json_decode($value->serializeToJsonString(), true);
        return array_shift($json);
    };

    // Print finding counts
    printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
    switch ($job->getState()) {
        case JobState::DONE:
            $results = $job->getRiskDetails()->getNumericalStatsResult();
            printf(
                'Value range: [%s, %s]' . PHP_EOL,
                $valueToString($results->getMinValue()),
                $valueToString($results->getMaxValue())
            );

            // Only print unique values
            $lastValue = null;
            foreach ($results->getQuantileValues() as $percent => $quantileValue) {
                $value = $valueToString($quantileValue);
                if ($value != $lastValue) {
                    printf('Value at %s quantile: %s' . PHP_EOL, $percent, $value);
                    $lastValue = $value;
                }
            }

            break;
        case JobState::FAILED:
            printf('Job %s had errors:' . PHP_EOL, $job->getName());
            $errors = $job->getErrors();
            foreach ($errors as $error) {
                var_dump($error->getDetails());
            }
            break;
        case JobState::PENDING:
            print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
            break;
        default:
            print('Unexpected job state. Most likely, the job is either running or has not yet started.');
    }
}