dlp/snippets/Risk/k_map.py (145 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sample app that uses the Data Loss Prevent API to perform risk anaylsis."""
import argparse
# [START dlp_k_map]
import concurrent.futures
from typing import List
import google.cloud.dlp
from google.cloud.dlp_v2 import types
import google.cloud.pubsub
def k_map_estimate_analysis(
project: str,
table_project_id: str,
dataset_id: str,
table_id: str,
topic_id: str,
subscription_id: str,
quasi_ids: List[str],
info_types: List[str],
region_code: str = "US",
timeout: int = 300,
) -> None:
"""Uses the Data Loss Prevention API to compute the k-map risk estimation
of a column set in a Google BigQuery table.
Args:
project: The Google Cloud project id to use as a parent resource.
table_project_id: The Google Cloud project id where the BigQuery table
is stored.
dataset_id: The id of the dataset to inspect.
table_id: The id of the table to inspect.
topic_id: The name of the Pub/Sub topic to notify once the job
completes.
subscription_id: The name of the Pub/Sub subscription to use when
listening for job completion notifications.
quasi_ids: A set of columns that form a composite key and optionally
their re-identification distributions.
info_types: Type of information of the quasi_id in order to provide a
statistical model of population.
region_code: The ISO 3166-1 region code that the data is representative
of. Can be omitted if using a region-specific infoType (such as
US_ZIP_5)
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Create helper function for unpacking values
def get_values(obj: types.Value) -> int:
return int(obj.integer_value)
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Convert the project id into full resource ids.
topic = google.cloud.pubsub.PublisherClient.topic_path(project, topic_id)
parent = f"projects/{project}/locations/global"
# Location info of the BigQuery table.
source_table = {
"project_id": table_project_id,
"dataset_id": dataset_id,
"table_id": table_id,
}
# Check that numbers of quasi-ids and info types are equal
if len(quasi_ids) != len(info_types):
raise ValueError(
"""Number of infoTypes and number of quasi-identifiers
must be equal!"""
)
# Convert quasi id list to Protobuf type
def map_fields(quasi_id: str, info_type: str) -> dict:
return {"field": {"name": quasi_id}, "info_type": {"name": info_type}}
quasi_ids = map(map_fields, quasi_ids, info_types)
# Tell the API where to send a notification when the job is complete.
actions = [{"pub_sub": {"topic": topic}}]
# Configure risk analysis job
# Give the name of the numeric column to compute risk metrics for
risk_job = {
"privacy_metric": {
"k_map_estimation_config": {
"quasi_ids": quasi_ids,
"region_code": region_code,
}
},
"source_table": source_table,
"actions": actions,
}
# Call API to start risk analysis job
operation = dlp.create_dlp_job(request={"parent": parent, "risk_job": risk_job})
def callback(message: google.cloud.pubsub_v1.subscriber.message.Message) -> None:
if message.attributes["DlpJobName"] == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()
# Now that the job is done, fetch the results and print them.
job = dlp.get_dlp_job(request={"name": operation.name})
print(f"Job name: {job.name}")
histogram_buckets = (
job.risk_details.k_map_estimation_result.k_map_estimation_histogram
)
# Print bucket stats
for i, bucket in enumerate(histogram_buckets):
print(f"Bucket {i}:")
print(
" Anonymity range: [{}, {}]".format(
bucket.min_anonymity, bucket.max_anonymity
)
)
print(f" Size: {bucket.bucket_size}")
for value_bucket in bucket.bucket_values:
print(
" Values: {}".format(
map(get_values, value_bucket.quasi_ids_values)
)
)
print(
" Estimated k-map anonymity: {}".format(
value_bucket.estimated_anonymity
)
)
subscription.set_result(None)
else:
# This is not the message we're looking for.
message.drop()
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_id)
subscription = subscriber.subscribe(subscription_path, callback)
try:
subscription.result(timeout=timeout)
except concurrent.futures.TimeoutError:
print(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
subscription.close()
# [END dlp_k_map]
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"project",
help="The Google Cloud project id to use as a parent resource.",
)
parser.add_argument(
"table_project_id",
help="The Google Cloud project id where the BigQuery table is stored.",
)
parser.add_argument("dataset_id", help="The id of the dataset to inspect.")
parser.add_argument("table_id", help="The id of the table to inspect.")
parser.add_argument(
"topic_id",
help="The name of the Pub/Sub topic to notify once the job completes.",
)
parser.add_argument(
"subscription_id",
help="The name of the Pub/Sub subscription to use when listening for"
"job completion notifications.",
)
parser.add_argument(
"quasi_ids",
nargs="+",
help="A set of columns that form a composite key.",
)
parser.add_argument(
"-t",
"--info-types",
nargs="+",
help="Type of information of the quasi_id in order to provide a"
"statistical model of population.",
required=True,
)
parser.add_argument(
"-r",
"--region-code",
default="US",
help="The ISO 3166-1 region code that the data is representative of.",
)
parser.add_argument(
"--timeout",
type=int,
help="The number of seconds to wait for a response from the API.",
)
args = parser.parse_args()
k_map_estimate_analysis(
args.project,
args.table_project_id,
args.dataset_id,
args.table_id,
args.topic_id,
args.subscription_id,
args.quasi_ids,
args.info_types,
region_code=args.region_code,
timeout=args.timeout,
)