utils/audio_upload.py (324 lines of code) (raw):

# Copyright 2024 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Imports conversations into Contact Center AI Insights. Accepts a GCS bucket of source audio files. For every audio file, a corresponding conversation will be created in Insights, using Audio Upload feature which transcribes the audio using STT v2 recognizer and perform redaction and model adaptation using DLP and phrase set templates respectively. """ import argparse import os import time import xml.etree.ElementTree as ET import google.auth from google.auth import impersonated_credentials import google.auth.transport.requests from google.cloud import contact_center_insights_v1 from google.cloud import storage import requests def _ParseArgs(): """Parse script arguments.""" parser = argparse.ArgumentParser() # Create a groups of args where exactly one of them must be set. # source_group = parser.add_mutually_exclusive_group(required=True) source_group = parser.add_argument_group(title='Bucket name or local path') source_group.add_argument( '--source_audio_gcs_bucket', default=None, help=( 'Path to a GCS bucket containing audio files to process as input. ' 'This bucket can be the same as the source bucket to colocate audio ' 'and transcript.' ), ) parser.add_argument( 'project_id', help='Project ID (not number) for the project to own the conversation.', ) parser.add_argument( '--impersonated_service_account', help=( 'A service account to impersonate. If specified, then GCP requests ' 'will be authenticated using service account impersonation. ' 'Otherwise, the gcloud default credential will be used.' ), ) parser.add_argument( '--analyze', default='False', help='Whether to analyze imported conversations. Default true.', ) parser.add_argument( '--insights_endpoint', default='contactcenterinsights.googleapis.com', help='Name for the Insights endpoint to call', ) parser.add_argument( '--language_code', default='en-US', help='Language code for all imported data.', ) parser.add_argument( '--insights_api_version', default='v1', help='Insights API version. Options include `v1` and `v1alpha1`.', ) parser.add_argument( '--agent_id', help='Agent identifier to attach to the created Insights conversations.', ) parser.add_argument( '--agent_channel', default=2, help='Agent channel to attach to the created Insights conversations.', ) parser.add_argument( '--xml_gcs_bucket', default=None, help='XML path to add labels to conversations.', ) parser.add_argument( '--inspect_template', default=None, help='Template used for inspecting info types in DLP.', ) parser.add_argument( '--deidentify_template', default=None, help='Template used for deidentifying info types in DLP.', ) parser.add_argument( '--audio_format', default=None, help='Audio file format/extension.' ) return parser.parse_args() def _DownloadFileFromGcs( bucket_name, source_blob_name, filename, project_id, impersonated_service_account, ): """Downloads a blob from the bucket. Args: bucket_name: The name of the GCS bucket that will hold the file. source_blob_name: Object blob to store. filename: Destination file name. project_id: The project ID (not number) to use for redaction. impersonated_service_account: The service account to impersonate. """ storage_client = storage.Client( project=project_id, credentials=_GetClientCredentials(impersonated_service_account), ) bucket = storage_client.bucket(bucket_name) blob = bucket.blob(source_blob_name) blob.download_to_filename(filename) def _UploadInsightsConversation( endpoint, api_version, project, gcs_audio_uri, metadata, impersonated_service_account, inspect_template, deidentify_template, medium=None, agent_channel=1, ): """Uploads a conversation in Insights. Args: endpoint: The insights endpoint to use (prod, staging, etc). api_version: The Insights API version to use. project: The number of the project that will own the conversation. gcs_audio_uri: The uri of the GCS audio file. metadata: Data dictionary about conversations. impersonated_service_account: The service account to impersonate. inspect_template: Template to inspect info types in transcripts. deidentify_template: Template to deidentify the info types in transcripts. medium: The medium of the conversation. Defaults to 1 for voice. agent_channel: Channel on which agent utterances are recorded. Returns: The conversation ID of the created conversation. """ oauth_token = _GetOauthToken(impersonated_service_account) url = ( 'https://{}/{}/projects/{}/locations/us-central1/conversations:upload' ).format(endpoint, api_version, project) headers = { 'charset': 'utf-8', 'Content-type': 'application/json', 'Authorization': 'Bearer {}'.format(oauth_token), } data = {'data_source': {'gcs_source': {'audio_uri': gcs_audio_uri}}} if metadata: if metadata.get('agent_id', None): data['agent_id'] = metadata['agent_id'] del metadata['agent_id'] if metadata.get('agent_name', None): # data['agent_name'] = metadata['agent_name'] del metadata['agent_name'] if metadata: # set labels data['labels'] = metadata # print("data",data) # if gcs_audio_uri: # data['data_source']['gcs_source']['transcript_uri'] = transcript_uri # if medium: # data['medium'] = medium # set channel if agent_channel: data['call_metadata'] = {} if agent_channel == 1 or agent_channel == 3: data['call_metadata']['customer_channel'] = 2 data['call_metadata']['agent_channel'] = 1 else: data['call_metadata']['customer_channel'] = 1 data['call_metadata']['agent_channel'] = agent_channel if inspect_template != 'None': redaction_config = {'inspect_template': inspect_template} if deidentify_template != 'None': redaction_config['deidentify_template'] = deidentify_template conversation = {'conversation': data, 'redaction_config': redaction_config} else: conversation = {'conversation': data} # print("conversation:",conversation) r = requests.post(url, headers=headers, json=conversation) if r.status_code == requests.codes.ok: # print('Successfully created conversation for audio uri `{}`.' # .format(gcs_audio_uri)) return r.json()['name'] # return r else: r.raise_for_status() def _GetGcsUri(bucket, object_name): """Returns a GCS uri for the given bucket and object. Args: bucket: The bucket in the URI. object_name: The name of the object. Returns: The GCS uri. """ return 'gs://{}/{}'.format(bucket, object_name) def _GetGcsUris(bucket, project_id, impersonated_service_account, uri=True): """Returns a list of GCS uris for files in a bucket. Args: bucket: The GCS bucket. project_id: The project ID (not number) to use. impersonated_service_account: The service account to impersonate. uri: Whether to return gcs uri or not Returns: The GCS uris or file name. """ uris = [] metadata = [] storage_client = storage.Client( project=project_id, credentials=_GetClientCredentials(impersonated_service_account), ) blobs = storage_client.list_blobs(bucket) for blob in blobs: # Blobs ending in slashes are actually directory paths. if not blob.name.endswith('/'): # Redaction Error: >0.5MB transcript size # if blob.size<=1e7: if uri: uris.append(_GetGcsUri(bucket, blob.name)) else: uris.append(blob.name) metadata.append(blob.metadata) return uris, metadata def _GetClientCredentials(impersonated_service_account): """Gets client credentials for GCP requests. If an account to impersonate is provided, then it will be used rather than the default. Otherwise, default gcloud credentials will be used. Args: impersonated_service_account: The service account to impersonate. Returns: A credential for requests to GCP. """ creds, _ = google.auth.default( scopes=['https://www.googleapis.com/auth/cloud-platform'] ) if not impersonated_service_account: return creds target_scopes = ['https://www.googleapis.com/auth/cloud-platform'] target_credentials = impersonated_credentials.Credentials( source_credentials=creds, target_principal=impersonated_service_account, target_scopes=target_scopes, lifetime=3600, ) # Override the source credentials so refresh will work. # See https://github.com/googleapis/google-auth-library-python/issues/416. # pylint: disable=protected-access target_credentials._source_credentials._scopes = creds.scopes return target_credentials def _GetOauthToken(impersonated_service_account): """Gets an oauth token to use for HTTP requests to GCP. Assumes usage of Gcloud credentials. Args: impersonated_service_account: The service account to impersonate. Returns: The oauth token. """ creds = _GetClientCredentials(impersonated_service_account) auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) return creds.token # Needs modification based on customer's XML structure/fields def _GetMetaData(file): """Parse XML file to get metadata. Args: file: XML file name Returns: data: Dictionary consisting of metadata """ tree = ET.parse(file) root = tree.getroot() data = {} data['duration'] = str(root[0][1].text) data['agent_id'] = str(root[0][6].text) data['channel'] = str(root[0][8].text) data['sid'] = str(root[0][10].text) data['dbs'] = str(root[0][11].text) data['no_of_holds'] = str(root[0][13].text) data['hold_time'] = str(root[0][16].text) return data def _GetOperation(operation_name): """Gets an operation. Args: operation_name: The operation name. Format is 'projects/{project_id}/locations/{location_id}/operations/{operation_id}'. For example, 'projects/my-project/locations/us-central1/operations/123456789'. Returns: An operation. """ # Construct an Insights client that will authenticate via Application Default # Credentials. # See authentication details at # https://cloud.google.com/docs/authentication/production. insights_client = contact_center_insights_v1.ContactCenterInsightsClient() # Call the Insights client to get the operation. operation = insights_client.transport.operations_client.get_operation( operation_name ) return operation def _UploadBulkAudio( audio_uris, metadata, project_id, insights_endpoint, api_version, agent_id, impersonated_service_account, agent_channel, xml_bucket, inspect_template, deidentify_template, audio_format, ): """Create conversations in Insights for a list of audio uris. Args: audio_uris: The audio uris for which conversations should be created. metadata: Data about the audio files. project_id: The project ID (not number) to use for transcription, redaction, and Insights. insights_endpoint: The Insights endpoint to send requests. api_version: The Insights API version to use. agent_id: An agent identifier to attach to the conversations. impersonated_service_account: The service account to impersonate. agent_channel: The Agent Track. xml_bucket: Bucket where XML files present for MetaData ingestion. inspect_template: To inspect the info types deidentify_template: To de-identify the info types audio_format: Audio file format/extension Returns: A list of conversations IDs for the created conversations. """ # conversation_names = [] operation_names = [] for audio_uri, metad in zip(audio_uris, metadata): # get metadata meta_data = {} if metad: meta_data = metad elif xml_bucket != 'None': tname = ( audio_uri.replace('gs://', '') .split('/')[-1] .replace(f'.{audio_format}', '.xml') ) # print(tname) xml_file_path = f'/tmp/{tname}' # f'{xml_path}/{tname}' _DownloadFileFromGcs( xml_bucket, tname, xml_file_path, project_id, impersonated_service_account, ) meta_data = _GetMetaData(xml_file_path) # print("meta_data",meta_data) os.remove(xml_file_path) elif agent_id != 'None': meta_data['agent_id'] = agent_id operation_name = _UploadInsightsConversation( insights_endpoint, api_version, project_id, audio_uri, meta_data, impersonated_service_account, inspect_template, deidentify_template, medium=None, agent_channel=agent_channel, ) # print(operation) # conversation_names.append(conversation) operation_names.append(operation_name) # sleep the ingestion to avoid exceeding quota time.sleep(1) # print(conversation_names) num_operations = len(operation_names) print( 'Successfully scheduled `{}` audio upload operations.'.format( num_operations ) ) # operations = [_GetOperation(operation_name) # for operation_name in operation_names] # print([operation.done for operation in operations]) # while operations: # operations = [operation for operation in operations # if not operation.done] # if not operations: # break # print('Still waiting for `{}` audio upload operations to complete' # .format(len(operations))) # # Sleep to avoid exceeding Speech-to-text quota. # time.sleep(1) # print('Finished waiting for all `{}` audio upload operations.'.format( # num_operations)) return operation_names def _GetProcessedAudios(project_id): """Returns transcripts uri which are processed. Args: project_id: GCP Project Id Returns: audios: Transcripts uri which are processed """ audios = [] request = contact_center_insights_v1.ListConversationsRequest( parent=f'projects/{project_id}/locations/us-central1', ) client = contact_center_insights_v1.ContactCenterInsightsClient() page_result = client.list_conversations(request=request) for response in page_result: audio = response.data_source.gcs_source.audio_uri if audio not in audios: audios.append(audio) return audios def _RemoveProcessedFiles(audio_uris, processed_audio_files, metadata): """Removes the processed files from audio_uris. Args: audio_uris: Audio files to process. processed_audio_files: Processed audio files names. metadata: Data about the audio files. Returns: unprocessed_audio_uris: Files which are left to process. unprocessed_metadata: Metadata corresponding to the unprocessed files. """ unprocessed_audio_uris = [] unprocessed_metadata = [] for file, metad in zip(audio_uris, metadata): if file not in processed_audio_files: unprocessed_audio_uris.append(file) unprocessed_metadata.append(metad) return unprocessed_audio_uris, unprocessed_metadata def main(): pargs = _ParseArgs() project_id = pargs.project_id impersonated_service_account = pargs.impersonated_service_account insights_endpoint = pargs.insights_endpoint api_version = pargs.insights_api_version agent_id = pargs.agent_id agent_channel = int(pargs.agent_channel) xml_bucket = pargs.xml_gcs_bucket # analyze_conv = pargs.analyze inspect_template = pargs.inspect_template deidentify_template = pargs.deidentify_template audio_format = pargs.audio_format source_audio_gcs_bucket = pargs.source_audio_gcs_bucket # pargs.source_local_audio_path or if source_audio_gcs_bucket: # Inputs are audio files. audio_uris, metadata = _GetGcsUris( source_audio_gcs_bucket, project_id, impersonated_service_account ) # get processed audio files processed_audio_files = _GetProcessedAudios(project_id) # remove processed files audio_uris, metadata = _RemoveProcessedFiles( audio_uris, processed_audio_files, metadata ) if not audio_uris: print('No audio file to process') # conversation_names = [] else: operation_names = _UploadBulkAudio( audio_uris, metadata, project_id, insights_endpoint, api_version, agent_id, impersonated_service_account, agent_channel, xml_bucket, inspect_template, deidentify_template, audio_format, ) print('Uploaded `{}` conversations'.format( len(operation_names))) else: print('Please provide audio files GCS bucket') if __name__ == '__main__': main()