data-loss-prevention/code/redact/app.py (75 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from flask import Flask, request
from google.cloud import dlp_v2
import ast
import base64
import logging
import json
# pylint: disable=C0103
app = Flask(__name__)
logging.basicConfig(format='%(message)s', level=logging.INFO)
def redact_dlp_item(project_id: str, inspect_item: str):
"""Redact sensitive data from a given string using Google Cloud DLP API.
This function uses the Google Cloud DLP API to identify and redact
sensitive information within a given string. It supports redacting
the following data types:
- PERSON_NAME
- US_SOCIAL_SECURITY_NUMBER
- EMAIL_ADDRESS
- PHONE_NUMBER
Args:
project_id (str): The Google Cloud project ID.
inspect_item (str): The string to be inspected and redacted.
Returns:
str: The redacted string with sensitive data
replaced by "[SENSITIVE DATA]".
"""
client = dlp_v2.DlpServiceClient()
parent = f"projects/{project_id}"
item = {"value": inspect_item}
inspect_config = {
"info_types": [
{"name": "PERSON_NAME"},
{"name": "US_SOCIAL_SECURITY_NUMBER"},
{"name": "EMAIL_ADDRESS"},
{"name": "PHONE_NUMBER"}
]
}
deidentify_config = {
"info_type_transformations": {
"transformations": [
{
"primitive_transformation": {
"replace_config": {
"new_value": {"string_value": "[SENSITIVE DATA]"}
}
}
}
]
}
}
response = client.deidentify_content(
request={
"parent": parent,
"deidentify_config": deidentify_config,
"inspect_config": inspect_config,
"item": item,
}
)
return response.item.value
@app.route("/", methods=["POST"])
def index():
"""Receive and parse Pub/Sub messages.
This function handles incoming HTTP POST requests, expecting a Pub/Sub
message in the request body. It parses the message, extracts the
`textPayload` from the message data, and redacts sensitive information
within the payload using the `redact_dlp_item` function.
Returns:
tuple: An empty tuple and a 204 status code indicating successful
processing of the Pub/Sub message.
"""
recevied_msg = request.get_json()
if not recevied_msg:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
logging.error(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(recevied_msg, dict) or "message" not in recevied_msg:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
logging.error(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = recevied_msg["message"]
if isinstance(pubsub_message, dict) and "data" in pubsub_message:
msg = json.loads(
base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()
)
payload = msg["textPayload"]
jsopn_payload = ast.literal_eval(payload)
if isinstance(jsopn_payload, dict):
for key, _ in jsopn_payload.items():
redact_item = redact_dlp_item(
project_id=os.environ.get("PROJECT_ID"),
inspect_item=jsopn_payload[key]
)
jsopn_payload[key] = redact_item
logging.info(jsopn_payload)
return ("", 204)
if __name__ == '__main__':
server_port = os.environ.get('PORT', '8080')
app.run(debug=False, port=server_port, host='0.0.0.0')