in dlp/src/k_map.php [56:198]
function k_map(
string $callingProjectId,
string $dataProjectId,
string $topicId,
string $subscriptionId,
string $datasetId,
string $tableId,
string $regionCode,
array $quasiIdNames,
array $infoTypes
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Verify input
if (count($infoTypes) != count($quasiIdNames)) {
throw new Exception('Number of infoTypes and number of quasi-identifiers must be equal!');
}
// Map infoTypes to quasi-ids
$quasiIdObjects = array_map(function ($quasiId, $infoType) {
$quasiIdField = (new FieldId())
->setName($quasiId);
$quasiIdType = (new InfoType())
->setName($infoType);
$quasiIdObject = (new TaggedField())
->setInfoType($quasiIdType)
->setField($quasiIdField);
return $quasiIdObject;
}, $quasiIdNames, $infoTypes);
// Construct analysis config
$statsConfig = (new KMapEstimationConfig())
->setQuasiIds($quasiIdObjects)
->setRegionCode($regionCode);
$privacyMetric = (new PrivacyMetric())
->setKMapEstimationConfig($statsConfig);
// Construct items to be analyzed
$bigqueryTable = (new BigQueryTable())
->setProjectId($dataProjectId)
->setDatasetId($datasetId)
->setTableId($tableId);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct risk analysis job config to run
$riskJob = (new RiskAnalysisJobConfig())
->setPrivacyMetric($privacyMetric)
->setSourceTable($bigqueryTable)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setRiskJob($riskJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$histBuckets = $job->getRiskDetails()->getKMapEstimationResult()->getKMapEstimationHistogram();
foreach ($histBuckets as $bucketIndex => $histBucket) {
// Print bucket stats
printf('Bucket %s:' . PHP_EOL, $bucketIndex);
printf(
' Anonymity range: [%s, %s]' . PHP_EOL,
$histBucket->getMinAnonymity(),
$histBucket->getMaxAnonymity()
);
printf(' Size: %s' . PHP_EOL, $histBucket->getBucketSize());
// Print bucket values
foreach ($histBucket->getBucketValues() as $percent => $valueBucket) {
printf(
' Estimated k-map anonymity: %s' . PHP_EOL,
$valueBucket->getEstimatedAnonymity()
);
// Pretty-print quasi-ID values
print(' Values: ' . PHP_EOL);
foreach ($valueBucket->getQuasiIdsValues() as $index => $value) {
print(' ' . $value->serializeToJsonString() . PHP_EOL);
}
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}