infra-as-code/modules/audio-data-format-change/function-source-code/lib.py (221 lines of code) (raw):

import os import uuid import re import json import glob import google.auth import google.cloud.logging import logging import hashlib import hmac from google.cloud import storage from datetime import datetime from google.cloud import secretmanager from record import RecordKeeper class AudioFormatterRunner: project_id: str raw_audio_bucket_id: str raw_audio_file_name: str raw_audio_file_extension: str formatted_audio_bucket_id: str formatted_audio_file_name: str metadata_bucket_id: str metadata = dict() number_of_channels: int hash_key: bytes create_date: str ingest_record_bucket_id: str storage_client = None def __init__( self, project_id, raw_audio_bucket_id, raw_audio_file_name, formatted_audio_bucket_id, metadata_bucket_id, hash_key, ingest_record_bucket_id, number_of_channels = None ): self.project_id = project_id self.raw_audio_bucket_id = raw_audio_bucket_id self.raw_audio_file_name = raw_audio_file_name self.formatted_audio_bucket_id = formatted_audio_bucket_id self.metadata_bucket_id = metadata_bucket_id self.number_of_channels = number_of_channels if number_of_channels else 2 self.ingest_record_bucket_id = ingest_record_bucket_id creds = self.get_credentials() self.storage_client = storage.Client(project = self.project_id, credentials = creds) secretmanager_client = secretmanager.SecretManagerServiceClient(credentials = creds) secret_path = f"projects/{project_id}/secrets/{hash_key}/versions/latest" secret_key = secretmanager_client.access_secret_version(name = secret_path).payload.data.decode("UTF-8") self.hash_key = bytes.fromhex(secret_key) def get_credentials(self): creds, _ = google.auth.default( scopes=['https://www.googleapis.com/auth/cloud-platform']) print('Getting credentials') return creds def get_folder_and_filename(self, blob): parts = blob.split("/") if (len(parts) != 1): filename = parts[-1] folder = parts[-2] else: filename = self.raw_audio_file_name folder = '' print(f'Got folder name: {folder}, filename: {filename}') return f"{folder}/{filename}" def download_from_gcs(self): """Downloads a specific the triggering blob from the trigger bucket in GCS and stores it in a temporary file which is renamed by assigning an UUID Returns: string: New filename assigned to the downloaded blob """ print('Starting download') encoding = hashlib.sha256 trigger_file = self.get_folder_and_filename(self.raw_audio_file_name) self.raw_audio_file_extension = trigger_file.split('.')[-1] new_uuid = str(uuid.uuid4()) new_filename = f'{new_uuid}.{self.raw_audio_file_extension}' self.metadata['original_file_name'] = hmac.new(self.hash_key, trigger_file.encode(), encoding).hexdigest() self.metadata['conversation_id']= new_uuid filename = f'/tmp/{new_filename}' print(f'Filename: {self.raw_audio_file_name}. File extension: {self.raw_audio_file_extension}') try: bucket = self.storage_client.bucket(self.raw_audio_bucket_id) blob = bucket.blob(self.raw_audio_file_name) blob.download_to_filename(filename) print(f'Blob {self.raw_audio_file_name} downloaded to {filename}') except Exception as e: raise e return new_filename, trigger_file def upload_to_gcs(self, bucket_name, filename): """Uploads a resource to GCS Args: bucket_name (string): Bucket ID where the filename needs to be uploaded filename (string): The name of the local file to upload """ try: bucket = self.storage_client.bucket(bucket_name) blob_name = f'{self.get_file_creation_date()}/{filename}' blob = bucket.blob(blob_name) path_to_file = '/tmp/'+filename if os.path.isfile(path_to_file): blob.upload_from_filename(path_to_file) print('Uploaded file into gcs') except Exception as e: raise e def format_audio(self, filename, trigger_file): """Changes the format of the given file to a FLAC file through an ffmpeg command system command generates three outputs which are then store in /tmp: .flac, encoding metadata (format-meta.txt) log data (ffmpeg-log-data) Function then checks for the .flac file, if it exists encoding was successful and calls get_audio_metadata and extract_metadata_from_file. If it doesn't exists encoding failed and calls log_error Args: filename (string): Name of the downloaded file to encode as flac """ self.formatted_audio_file_name = filename.replace(self.raw_audio_file_extension, 'flac') raw_audio_path = '/tmp/' + filename formatted_audio_path = '/tmp/' + self.formatted_audio_file_name if self.raw_audio_file_extension != 'flac': command = f'ffmpeg -hide_banner -i {raw_audio_path} -ac {self.number_of_channels} {formatted_audio_path} -f ffmetadata /tmp/format-meta.txt 2>/tmp/ffmpeg-log-data.txt' os.system(command) if os.path.isfile(formatted_audio_path) and os.path.getsize(formatted_audio_path) > 0: print(f'Formatted {filename} into {self.formatted_audio_file_name}') self.get_audio_metadata(formatted_audio_path) self.extract_metadata(filename, trigger_file) return True return False def extract_metadata_from_file(self): """Extracts metadata from the raw filename, from the ffmpeg encoding log and audio metadata by opening each file produced by ffmpeg and reading the contents. """ stream = dict() with open('/tmp/audio-meta.txt', 'r') as f: content = f.read() for line in content.splitlines(): if 'codec_name' in line: stream['codec_name'] = line.split('=')[1].strip() elif 'sample_rate' in line: stream['sample_rate'] = line.split('=')[1].strip() elif 'channels' in line: stream['channels'] = line.split('=')[1].strip() elif 'channel_layout' in line: stream['channel_layout'] = line.split('=')[1].strip() elif 'start_time' in line: stream['start_time'] = line.split('=')[1].strip() elif 'duration' in line: stream['duration'] = line.split('=')[1].strip() elif 'bits_per_raw_sample' in line: stream['bits_per_raw_sample'] = line.split('=')[1].strip() self.metadata['stream'] = stream file_path = '/tmp/format-meta.txt' if os.path.exists(file_path): with open(file_path, 'r') as f: for line in f: if 'encoder' in line: self.metadata['encoder'] = line.split('=')[1].strip() def get_audio_metadata(self,audio_path): """Calls a ffmpeg command to extract the audio metadata, such as streams, sample rate, channels, etc. Args: audio_path (string): Path to the audio file to extract metadata from """ command = f'ffprobe -loglevel quiet -hide_banner -i {audio_path} -show_streams > /tmp/audio-meta.txt' os.system(command) print('Extracted stream metadata') def get_file_creation_date(self): today = datetime.today().date() return today.strftime("%m_%d_%y") def extract_metadata(self, filename, trigger_filename): """Extracts metadata from file conversation and original filename Args: filename (string): Name of the file where metadata will be stored """ self.extract_metadata_from_file() filename = filename.replace(self.raw_audio_file_extension, 'json') with open(f'/tmp/{filename}', 'w') as jd: json.dump(self.metadata, jd, indent = 2) print('Created metadata file') print(self.metadata) def delete_tmp_files(self): """Deletes all tmp files created to free allocated memory """ for filename in os.listdir('/tmp'): file_path = os.path.join('/tmp', filename) if os.path.isfile(file_path) and filename != 'run': os.remove(file_path) print('Deleted tmp files') def upload_resources(self, filename): """Uploads encoded .flac file and its metadata to their respective buckets Args: filename (string): Name of the downloaded file to encode as flac """ #Upload .flac self.upload_to_gcs(self.formatted_audio_bucket_id, self.formatted_audio_file_name) #Upload metadata filename = filename.replace(self.raw_audio_file_extension, 'json') self.upload_to_gcs(self.metadata_bucket_id, filename) self.set_blob_metadata() def get_ffmpeg_error(self): """Retrieves an error from ffmpeg log data if exists If it doesn't exists then format change was successful Returns: str: ffmpeg error message """ with open("/tmp/ffmpeg-log-data.txt", "r") as f: error_message = f.read() if error_message: return error_message def get_log_entry(self, error_message): entry = dict() entry['message'] = 'Audio format change failure' entry['error'] = error_message entry['raw_file_gcs_path'] = 'gs://{}/{}'.format(self.raw_audio_bucket_id, self.raw_audio_file_name) entry['hashed_filename'] = self.metadata['original_file_name'] return entry def log_error(self, error_entry = None, severity = "ERROR"): """Logs an error or warning in Cloud Logging if error happens in the cloud function Args: error_entry (dict, optional): Dictionary with additional data for the log to help troubleshoot. Defaults to None. severity (str, optional): Changes the type of log, if it is an error or warning. Defaults to "ERROR". """ creds = self.get_credentials() client = google.cloud.logging.Client(project = self.project_id, credentials = creds) logger = client.logger(name="cf_audio_format_change_logger") if error_entry: entry = self.get_log_entry(error_entry) logger.log_struct(entry,severity=severity,) print('Error logged') else: error_message = self.get_ffmpeg_error() entry = self.get_log_entry(error_message) logger.log_struct(entry,severity="ERROR",) print('Error logged, file format change failed') def set_blob_metadata(self): """Set a blob's metadata.""" try: bucket = self.storage_client.bucket(self.formatted_audio_bucket_id) blob_name = self.get_file_creation_date() + '/' + self.formatted_audio_file_name blob = bucket.get_blob(blob_name) metageneration_match_precondition = None blob.metadata = self.metadata blob.patch() print(f"Updated formatted file metadata") except Exception as e: raise e def run_format(self): """Runs the format change when the format change is successful it sends the correct payload. Calls verify_file from RecordKeeper to: Verify that filename has a case manager otherwise it will not process and log a warning Verify if file was already processed it will log a warning Modify the parquet file to keep track of the pipeline step status """ filename, trigger_file = self.download_from_gcs() self.record_keeper = RecordKeeper(self.ingest_record_bucket_id, self.metadata['original_file_name'], self.storage_client) try: self.record_keeper.verify_file() print(f'New assigned filename: {filename}') if(self.format_audio(filename, trigger_file)): #Successfully changed format print('Changed format') self.upload_resources(filename) else: #Unsuccessful format change, write on error and delete from processed print('Unsucessful') self.record_keeper.replace_row( self.record_keeper.create_error_record( f'An error ocurred while changing audio format: {self.get_ffmpeg_error()}')) self.log_error() except Exception as e: match str(e): # case 'Repeated file with no case manager email': # self.log_error(str(e), "WARNING") # case 'File is processing or was already processed': # self.log_error(str(e), "WARNING") case _: self.record_keeper.replace_row( self.record_keeper.create_error_record( f'An error ocurred while changing audio format: {str(e)}')) self.log_error(str(e)) self.delete_tmp_files() print("Finished")