infra-as-code/modules/ingest-pipeline/cf-audio-redaction/audio_redaction.py (157 lines of code) (raw):

# Copyright 2024 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import json from google.cloud import storage import ffmpeg from google.cloud import dlp import tempfile class AudioRedaction: def __init__(self, bucket_name, file_name, project_id): self.bucket_name = bucket_name self.transcript_file_name = file_name self.storage_client = storage.Client() self.project_id = project_id def redact_audio(self, event, redacted_audios_bucket_name): """ Redacts audio based on the provided event data. Args: event: A dictionary containing information about the audio file. """ try: json_file = self.download_from_gcs(self.bucket_name, self.transcript_file_name) src_bucket_name = event.get("event_bucket") audio_file_name = event.get("event_filename") print("1) Download original audio from GCS") tmp_audio_file = self.download_audio_from_gcs(src_bucket_name, audio_file_name) print("2) Call DLP and redact audio file using FFMPEG") for result in json_file['results']: print("DLP: add findings to transcript") redacted_transcript = self.redact_text(result['alternatives']) print(redacted_transcript) print("Extract intervals for redaction") redaction_intervals = self.get_redaction_intervals(result['alternatives'][0]) print(redaction_intervals) self.redact_audio_file(redaction_intervals, tmp_audio_file) print("3) Upload redacted audio to corresponding bucket in GCS") self.upload_file_to_gcs(redacted_audios_bucket_name, f"/tmp/{tmp_audio_file}", audio_file_name) print(json_file) with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_json_file: json.dump(json_file, tmp_json_file, indent=4) tmp_json_file_name = tmp_json_file.name print("4) Upload modified JSON file to GCS") self.upload_file_to_gcs(self.bucket_name, tmp_json_file_name, self.transcript_file_name) except Exception as e: print(f"An error occurred: {e}") def redact_text(self, data): data[0]['dlp'] = [] client = dlp.DlpServiceClient() inspect_config = dlp.InspectConfig( info_types= [ dlp.InfoType(name="PERSON_NAME"), dlp.InfoType(name="PHONE_NUMBER"), dlp.InfoType(name="ORGANIZATION_NAME"), dlp.InfoType(name="FIRST_NAME"), dlp.InfoType(name="LAST_NAME"), dlp.InfoType(name="EMAIL_ADDRESS"), dlp.InfoType(name="DATE_OF_BIRTH"), dlp.InfoType(name="EMAIL_ADDRESS"), dlp.InfoType(name="US_SOCIAL_SECURITY_NUMBER"), dlp.InfoType(name="STREET_ADDRESS") ], include_quote=True ) item = dlp.ContentItem( value=data[0]['transcript'], ) request = dlp.InspectContentRequest( parent=f"projects/{self.project_id}", # Correct parent construction inspect_config=inspect_config, item=item, ) response = client.inspect_content(request=request) print(response) if response.result.findings: for finding in response.result.findings: try: if finding.quote: print("Quote: {}".format(finding.quote)) data[0]['dlp'].append(finding.quote) except AttributeError: pass else: print("No findings.") return data def download_from_gcs(self, bucket_id, filename): """Downloads transcript contents from GCS Args: bucket_id (str): Bucket name filename (str): Blob name Returns: str: Transcript """ bucket = self.storage_client.get_bucket(bucket_id) blob = bucket.blob(filename) content = blob.download_as_text() json_data = json.loads(content) print('Downloaded transcript from gcs') return json_data def upload_file_to_gcs( self, bucket_name, source_file_name, destination_blob_name ): """Uploads a local audio file to GCS. Args: bucket_name: The name of the GCS bucket that will hold the file. source_file_name: The name of the source file. destination_blob_name: The name of the blob that will be uploaded to GCS. project_id: The project ID (not number) to use for redaction. impersonated_service_account: The service account to impersonate. """ bucket = self.storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(source_file_name) os.remove(source_file_name) def download_audio_from_gcs(self, src_bucket_name, audio_file_name): """Downloads an audio file from Google Cloud Storage to a temporary location. Args: bucket_id (str): Bucket name filename (str): Blob name Returns: str: audio """ try: bucket = self.storage_client.get_bucket(src_bucket_name) blob = bucket.blob(audio_file_name) tmp_audio_file = f"{audio_file_name.split("/")[1]}" destination_file_name = f"/tmp/{tmp_audio_file}" blob.download_to_filename(destination_file_name) # 1. File size comparison if os.path.isdir(f'/tmp/{tmp_audio_file}'): print(f'Error: {f'/tmp/{tmp_audio_file}'} is a directory, not a file.') else: file_size = os.path.getsize(f'/tmp/{tmp_audio_file}') print(f'File downloaded. Size: {file_size} bytes') print('Downloaded audio from gcs') return tmp_audio_file except Exception as e: print(f"Error uploading file: {e}") return '' def redact_audio_file(self, redaction_intervals, tmp_audio_file): """Redacts the audio file using ffmpeg.""" try: volume_filters = [] for element in redaction_intervals: filter_str = f"volume=enable='between(t,{element['startOffset'].replace("s", "")},{element['endOffset'].replace("s", "")})':volume=0" volume_filters.append(filter_str) filter_graph = ",".join(volume_filters) if len(volume_filters) == 0: return with tempfile.NamedTemporaryFile(suffix='.flac', delete=False) as temp_output: temp_output_path = temp_output.name ( ffmpeg .input(f"/tmp/{tmp_audio_file}") .output(temp_output_path, af=filter_graph) .overwrite_output() .run() ) # Replace the original file with the temporary file os.replace(temp_output_path, f"/tmp/{tmp_audio_file}") except ffmpeg.Error as e: print(f"Error al procesar el audio: {e}") except Exception as e: print(f"Error redacting audio file ffmpeg: {e}") return '' def get_redaction_intervals(self, json_file): """Extracts the redaction intervals from the JSON file.""" redaction_intervals = [] word_list = [word_item['word'] for word_item in json_file['words']] for dlp_entry in json_file['dlp']: phrase = dlp_entry.split(' ') if len(phrase) >= 2: for index, word in enumerate(phrase): for i, element in enumerate(json_file['words']): if i < len(json_file['words']) - 1 and index < len(phrase) - 1: if word == element['word']: if word_list[i:i + len(phrase)] == phrase: element['index'] = i found_words = json_file['words'][i:i + len(phrase)] redaction_intervals.extend(found_words) else: for index, word in enumerate(phrase): for i, element in enumerate(json_file['words']): if word == element['word']: element['index'] = i redaction_intervals.append(element) return redaction_intervals