dlp/snippets/Inspect/inspect_bigquery_send_to_scc.py (83 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sample app that uses the Data Loss Prevention API to inspect a string, a
local file or a file on Google Cloud Storage."""
import argparse
# [START dlp_inspect_bigquery_send_to_scc]
import time
from typing import List
import google.cloud.dlp
def inspect_bigquery_send_to_scc(
project: str,
info_types: List[str],
max_findings: int = 100,
) -> None:
"""
Uses the Data Loss Prevention API to inspect public bigquery dataset
and send the results to Google Security Command Center.
Args:
project: The Google Cloud project id to use as a parent resource.
info_types: A list of strings representing infoTypes to inspect for.
A full list of infoType categories can be fetched from the API.
max_findings: The maximum number of findings to report; 0 = no maximum
"""
# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()
# Prepare info_types by converting the list of strings into a list of
# dictionaries.
info_types = [{"name": info_type} for info_type in info_types]
# Construct the configuration dictionary.
inspect_config = {
"info_types": info_types,
"min_likelihood": google.cloud.dlp_v2.Likelihood.UNLIKELY,
"limits": {"max_findings_per_request": max_findings},
"include_quote": True,
}
# Construct a Cloud Storage Options dictionary with the big query options.
storage_config = {
"big_query_options": {
"table_reference": {
"project_id": "bigquery-public-data",
"dataset_id": "usa_names",
"table_id": "usa_1910_current",
}
}
}
# Tell the API where to send a notification when the job is complete.
actions = [{"publish_summary_to_cscc": {}}]
# Construct the job definition.
job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
# Convert the project id into a full resource id.
parent = f"projects/{project}"
# Call the API.
response = dlp.create_dlp_job(
request={
"parent": parent,
"inspect_job": job,
}
)
print(f"Inspection Job started : {response.name}")
job_name = response.name
# Waiting for a maximum of 15 minutes for the job to get complete.
no_of_attempts = 30
while no_of_attempts > 0:
# Get the DLP job status.
job = dlp.get_dlp_job(request={"name": job_name})
# Check if the job has completed.
if job.state == google.cloud.dlp_v2.DlpJob.JobState.DONE:
break
if job.state == google.cloud.dlp_v2.DlpJob.JobState.FAILED:
print("Job Failed, Please check the configuration.")
return
# Sleep for a short duration before checking the job status again.
time.sleep(30)
no_of_attempts -= 1
# Print out the results.
print(f"Job name: {job.name}")
result = job.inspect_details.result
if result.info_type_stats:
for stats in result.info_type_stats:
print(f"Info type: {stats.info_type.name}")
print(f"Count: {stats.count}")
else:
print("No findings.")
# [END dlp_inspect_bigquery_send_to_scc]
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--project",
help="The Google Cloud project id to use as a parent resource.",
)
parser.add_argument(
"--info_types",
action="append",
help="Strings representing infoTypes to look for. A full list of "
"info categories and types is available from the API. Examples "
'include "FIRST_NAME", "LAST_NAME", "EMAIL_ADDRESS".',
)
parser.add_argument(
"--max_findings",
type=int,
help="The maximum number of findings to report; 0 = no maximum.",
)
args = parser.parse_args()
inspect_bigquery_send_to_scc(
args.project,
args.info_types,
args.max_findings,
)