in dlp/src/inspect_bigquery_with_sampling.php [56:173]
function inspect_bigquery_with_sampling(
string $callingProjectId,
string $topicId,
string $subscriptionId,
string $projectId,
string $datasetId,
string $tableId
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Specify the BigQuery table to be inspected.
$bigqueryTable = (new BigQueryTable())
->setProjectId($projectId)
->setDatasetId($datasetId)
->setTableId($tableId);
$bigQueryOptions = (new BigQueryOptions())
->setTableReference($bigqueryTable)
->setRowsLimit(1000)
->setSampleMethod(SampleMethod::RANDOM_START)
->setIdentifyingFields([
(new FieldId())
->setName('name')
]);
$storageConfig = (new StorageConfig())
->setBigQueryOptions($bigQueryOptions);
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
$personNameInfoType = (new InfoType())
->setName('PERSON_NAME');
$infoTypes = [$personNameInfoType];
// Specify how the content should be inspected.
$inspectConfig = (new InspectConfig())
->setInfoTypes($infoTypes)
->setIncludeQuote(true);
// Specify the action that is triggered when the job completes.
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Configure the long running job we want the service to perform.
$inspectJob = (new InspectJobConfig())
->setInspectConfig($inspectConfig)
->setStorageConfig($storageConfig)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$job = $dlp->createDlpJob($parent, [
'inspectJob' => $inspectJob
]);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName']) &&
$message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$job = $dlp->getDlpJob($job->getName());
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
printf('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$infoTypeStats = $job->getInspectDetails()->getResult()->getInfoTypeStats();
if (count($infoTypeStats) === 0) {
printf('No findings.' . PHP_EOL);
} else {
foreach ($infoTypeStats as $infoTypeStat) {
printf(
' Found %s instance(s) of infoType %s' . PHP_EOL,
$infoTypeStat->getCount(),
$infoTypeStat->getInfoType()->getName()
);
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
printf('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
printf('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}