utils/import_conversations_v2.py (910 lines of code) (raw):
# Copyright 2024 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Imports conversations into Contact Center AI Insights.
Accepts either a local audio file or a GCS bucket of source audio files.
In the latter case, assumes that every file has the same audio properties.
Transcripts will be uploaded to the specified destination GCS bucket, and they
will be named with the same basename as the corresponding audio file.
For every audio file, a corresponding conversation will be created in Insights,
if metadata file(xml) is provided it will be added to the conversation.
Additionally, conditional on the `--analyze` flag, each conversation will also
be analyzed a single time. This allows the user to observe the results of
analysis after the script completes.
"""
import argparse
import os
import time
import xml.etree.ElementTree as ET
import google.auth
from google.auth import impersonated_credentials
import google.auth.transport.requests
from google.cloud import contact_center_insights_v1
from google.cloud import speech_v1p1beta1
from google.cloud import storage
import google.cloud.dlp_v2
from google.cloud.speech_v1p1beta1 import types as enums
import requests
import json
from urllib.parse import urlparse
from urllib.parse import urlunparse
import pickle
def _ParseArgs():
"""Parse script arguments."""
parser = argparse.ArgumentParser()
# Create a groups of args where exactly one of them must be set.
# source_group = parser.add_mutually_exclusive_group(required=True)
source_group = parser.add_argument_group(title='Bucket name or local path')
source_group.add_argument(
'--source_local_audio_path',
help='Path to a local audio file to process as input.',
)
source_group.add_argument(
'--source_audio_gcs_bucket',
default=None,
help=(
'Path to a GCS bucket containing audio files to process as input. '
'This bucket can be the same as the source bucket to colocate audio '
'and transcript.'
),
)
source_group.add_argument(
'--source_voice_transcript_gcs_bucket',
help=(
'Path to a GCS bucket containing voice transcripts to process as'
' input.'
),
)
source_group.add_argument(
'--source_chat_transcript_gcs_bucket',
help=(
'Path to a GCS bucket containing chat transcripts to process as'
' input.'
),
)
parser.add_argument(
'--dest_gcs_bucket',
help=(
'Name of the GCS bucket that will hold resulting transcript files.'
' Only relevant when providing audio files as input. Otherwise'
' ignored.'
),
)
parser.add_argument(
'project_id',
help='Project ID (not number) for the project to own the conversation.',
)
parser.add_argument(
'--impersonated_service_account',
help=(
'A service account to impersonate. If specified, then GCP requests '
'will be authenticated using service account impersonation. '
'Otherwise, the gcloud default credential will be used.'
),
)
parser.add_argument(
'--redact', default=False, help='Whether to redact the transcripts.'
)
parser.add_argument(
'--analyze',
default='False',
help='Whether to analyze imported conversations. Default true.',
)
parser.add_argument(
'--insights_endpoint',
default='contactcenterinsights.googleapis.com',
help='Name for the Insights endpoint to call',
)
parser.add_argument(
'--language_code',
default='en-US',
help='Language code for all imported data.',
)
parser.add_argument(
'--encoding', default='LINEAR16', help='Encoding for all imported data.'
)
parser.add_argument(
'--sample_rate_hertz',
default=0,
type=int,
help=(
'Sample rate. If left out, Speech-to-text may infer it depending on '
'the encoding.'
),
)
parser.add_argument(
'--insights_api_version',
default='v1',
help='Insights API version. Options include `v1` and `v1alpha1`.',
)
parser.add_argument(
'--agent_id',
help='Agent identifier to attach to the created Insights conversations.',
)
parser.add_argument(
'--agent_channel',
default=2,
help='Agent channel to attach to the created Insights conversations.',
)
parser.add_argument(
'--xml_gcs_bucket',
default=None,
help='XML path to add labels to conversations.',
)
parser.add_argument(
'--transcript_metadata_flag',
default=None,
help=('''Flag will be set True if the metadata is present
inside the transcript''')
)
parser.add_argument(
'--folder_name',
default=None,
help=('''Folder name is required in case transcripts are present inside
a folder and not in a bucket directly''')
)
return parser.parse_args()
def _DownloadFileFromGcs(
bucket_name,
source_blob_name,
filename,
project_id,
impersonated_service_account,
):
"""Downloads a blob from the bucket.
Args:
bucket_name: The name of the GCS bucket that will hold the file.
source_blob_name: Object blob to store.
filename: Destination file name.
project_id: The project ID (not number) to use for redaction.
impersonated_service_account: The service account to impersonate.
"""
storage_client = storage.Client(
project=project_id,
credentials=_GetClientCredentials(impersonated_service_account),
)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(filename)
def _UploadFileToGcs(
bucket_name,
source_file_name,
destination_blob_name,
project_id,
impersonated_service_account,
):
"""Uploads a local audio file to GCS.
Args:
bucket_name: The name of the GCS bucket that will hold the file.
source_file_name: The name of the source file.
destination_blob_name: The name of the blob that will be uploaded to GCS.
project_id: The project ID (not number) to use for redaction.
impersonated_service_account: The service account to impersonate.
"""
storage_client = storage.Client(
project=project_id,
credentials=_GetClientCredentials(impersonated_service_account),
)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
def _TranscribeAsync(
storage_uri,
encoding,
language_code,
sample_rate_hertz,
impersonated_service_account,
):
"""Transcribe long audio file from Cloud Storage.
Args:
storage_uri: URI for audio file in Cloud Storage, e.g.
gs://[BUCKET]/[FILE]
encoding: The encoding to use for transcription
language_code: The language code to use for transcription
sample_rate_hertz: The sample rate of the audio
impersonated_service_account: The service account to impersonate.
Returns:
The transcription operation, which can be polled until done.
"""
encoding_map = {
'LINEAR16': enums.RecognitionConfig.AudioEncoding.LINEAR16,
'MP3': enums.RecognitionConfig.AudioEncoding.MP3,
'FLAC': enums.RecognitionConfig.AudioEncoding.FLAC,
'AMR': enums.RecognitionConfig.AudioEncoding.AMR,
'AMR_WB': enums.RecognitionConfig.AudioEncoding.AMR_WB,
'OGG_OPUS': enums.RecognitionConfig.AudioEncoding.OGG_OPUS,
'SPEEX_WITH_HEADER_BYTE': (
enums.RecognitionConfig.AudioEncoding.SPEEX_WITH_HEADER_BYTE
),
}
# The recognition configuration. Assumes the audio is a two-channel phone
# call.
config = {
'language_code': language_code,
'encoding': encoding_map[encoding],
'model': 'phone_call',
'audio_channel_count': 2,
'use_enhanced': True,
'enable_separate_recognition_per_channel': True,
'enable_automatic_punctuation': True,
'enable_word_time_offsets': True,
}
if sample_rate_hertz > 0:
config['sample_rate_hertz'] = sample_rate_hertz
client = speech_v1p1beta1.SpeechClient(
credentials=_GetClientCredentials(impersonated_service_account)
)
# changed: Error passed 3 args to long_running_recognize
audio_file = speech_v1p1beta1.RecognitionAudio(uri=storage_uri)
# audio = {'uri': storage_uri}
try:
operation = client.long_running_recognize(config=config, audio=audio_file)
return operation
except google.api_core.exceptions.GoogleAPIError as e:
print(
'Error `{}` when scheduling async transcription for storage uri {}'
.format(e, storage_uri)
)
return None
def _Redact(transcript_response, project_id, impersonated_service_account):
"""Redacts a transcript response.
Args:
transcript_response: The response from transcription.
project_id: The project ID (not number) to use for redaction.
impersonated_service_account: The service account to impersonate.
Returns:
The response from transcription.
"""
dlp = google.cloud.dlp_v2.DlpServiceClient(
# project=project_id,
credentials=_GetClientCredentials(impersonated_service_account)
)
# The list of types to redact. Making this too aggressive can damage word time
# offsets. Eventually, a better solution could be determined than sending the
# entire STT response to DLP so that only the transcript parts would be
# potentially redacted.
info_types = [
'AGE',
'CREDIT_CARD_NUMBER',
'CREDIT_CARD_TRACK_NUMBER',
'DOMAIN_NAME',
'EMAIL_ADDRESS',
'FEMALE_NAME',
'MALE_NAME',
'FIRST_NAME',
'GENDER',
'GENERIC_ID',
'IP_ADDRESS',
'LAST_NAME',
'LOCATION',
'PERSON_NAME',
'PHONE_NUMBER',
'STREET_ADDRESS',
]
inspect_config = {
'info_types': [{'name': info_type} for info_type in info_types]
}
deidentify_config = {
'info_type_transformations': {
'transformations': [
{
'primitive_transformation': {
'character_mask_config': {
# Will replace PII terms with a series of '*'.
'masking_character': '*',
# Zero means no limit on characters to redact.
'number_to_mask': 0,
}
}
}
]
}
}
project_path = f'projects/{project_id}'
item = {'value': str(transcript_response)}
response = dlp.deidentify_content(
request={
'parent': project_path,
'deidentify_config': deidentify_config,
'inspect_config': inspect_config,
'item': item,
}
)
return response.item.value
def _RedactTranscript(transcript_response, project_id, \
impersonated_service_account):
"""Redacts a transcript response.
Args:
transcript_response: The response from transcription.
project_id: The project ID (not number) to use for redaction.
impersonated_service_account: The service account to impersonate.
Returns:
The response from transcription.
"""
dlp = google.cloud.dlp_v2.DlpServiceClient(
credentials=_GetClientCredentials(impersonated_service_account),
)
transcript_dict = json.loads(transcript_response)
entry_list = transcript_dict['entries']
headers = [{'name': key} for key in entry_list[0].keys()]
rows = []
for element in entry_list:
rows.append(
{
'values': [
{'string_value': str(element['start_timestamp_usec'])},
{'string_value': element['text']},
{'string_value': element['role']},
{'string_value': str(element['user_id'])},
]
}
)
items = {'table': {'headers': headers, 'rows': rows}}
info_types = [
'AGE',
'CREDIT_CARD_NUMBER',
'CREDIT_CARD_TRACK_NUMBER',
'DOMAIN_NAME',
'EMAIL_ADDRESS',
'FEMALE_NAME',
'MALE_NAME',
'FIRST_NAME',
'GENDER',
'GENERIC_ID',
'IP_ADDRESS',
'LAST_NAME',
'LOCATION',
'PERSON_NAME',
'PHONE_NUMBER',
'STREET_ADDRESS',
]
deidentify_config = {
'record_transformations': {
'field_transformations': [{
'fields': [{'name': 'text'}],
'info_type_transformations': {
'transformations': [{
'primitive_transformation': {
'character_mask_config': {'masking_character': '*'}
},
'info_types': [
{'name': info_type} for info_type in info_types
],
}]
},
}]
}
}
inspect_config = {
'info_types': [{'name': info_type} for info_type in info_types]
}
project_path = f'projects/{project_id}'
response = dlp.deidentify_content(
request={
'parent': project_path,
'deidentify_config': deidentify_config,
'inspect_config': inspect_config,
'item': items,
}
)
return_list = []
for row in response.item.table.rows:
row_list = []
for col in row.values:
if 'string_value' in col:
row_list.append(col.string_value)
return_list.append(row_list)
json_entries_list = []
for entry in return_list:
json_entry = {
'start_timestamp_usec': str(entry[0]),
'text': entry[1],
'role': entry[2],
'user_id': int(entry[3]),
}
json_entries_list.append(json_entry)
transcript_dict['entries'] = json_entries_list
return transcript_dict
def _UploadTranscript(
transcript_response,
bucket,
transcript_file_name,
project_id,
impersonated_service_account,
):
"""Uploads an audio file transcript to GCS.
Args:
transcript_response: The response from transcription.
bucket: The bucket that will hold the transcript
transcript_file_name: The name of the file that will be uploaded.
project_id: The project ID (not number) to use for redaction.
impersonated_service_account: The service account to impersonate.
"""
# Use an identifier that is unique across transcriptions to prevent a race.
tmp_file = 'tmp-{}-{}'.format(bucket, transcript_file_name)
f = open(tmp_file, 'w')
f.write(str(transcript_response))
f.close()
_UploadFileToGcs(
bucket,
tmp_file,
transcript_file_name,
project_id,
impersonated_service_account,
)
os.remove(tmp_file)
def _CreateInsightsConversation(
endpoint,
api_version,
project,
gcs_audio_uri,
gcs_transcript_uri,
metadata,
impersonated_service_account,
medium=None,
agent_channel=1,
):
"""Creates a conversation in Insights.
Args:
endpoint: The insights endpoint to use (prod, staging, etc)
api_version: The Insights API version to use.
project: The number of the project that will own the conversation
gcs_audio_uri: The uri of the GCS audio file.
gcs_transcript_uri: The uri of the GCS transcript file.
metadata: Data about the conversation.
impersonated_service_account: The service account to impersonate.
medium: The medium of the conversation. Defaults to 1 for voice.
agent_channel: The Agent track.
Returns:
The conversation ID of the created conversation.
"""
oauth_token = _GetOauthToken(impersonated_service_account)
url = (
'https://{}/{}/projects/{}/locations/us-central1/conversations'
).format(endpoint, api_version, project)
headers = {
'charset': 'utf-8',
'Content-type': 'application/json',
'Authorization': 'Bearer {}'.format(oauth_token),
}
data = {'data_source': {'gcs_source': {'transcript_uri': gcs_transcript_uri}}}
if metadata:
if metadata['agent_id']:
data['agent_id'] = metadata['agent_id']
del metadata['agent_id']
# set labels
data['labels'] = metadata
if gcs_audio_uri:
data['data_source']['gcs_source']['audio_uri'] = gcs_audio_uri
if medium:
data['medium'] = medium
# set channel
if agent_channel:
data['call_metadata'] = {}
if agent_channel == 1:
data['call_metadata']['customer_channel'] = 2
else:
data['call_metadata']['customer_channel'] = 1
data['call_metadata']['agent_channel'] = agent_channel
r = requests.post(url, headers=headers, json=data)
if r.status_code == requests.codes.ok:
print(
'Successfully created conversation for transcript uri `{}` '
'and audio uri `{}`.'.format(gcs_transcript_uri, gcs_audio_uri)
)
return r.json()['name']
else:
r.raise_for_status()
def _GetGcsUri(bucket, object_name):
"""Returns a GCS uri for the given bucket and object.
Args:
bucket: The bucket in the URI.
object_name: The name of the object.
Returns:
The GCS uri.
"""
return 'gs://{}/{}'.format(bucket, object_name)
def _GetGcsUris(bucket, project_id, impersonated_service_account,
folder_name=None, uri=True):
"""Returns a list of GCS uris for files in a bucket.
Args:
bucket: The GCS bucket.
project_id: The project ID (not number) to use.
impersonated_service_account: The service account to impersonate.
folder_name: Folder path if the file is inside a nested path inside bucket
uri: Whether to return gcs uri or not
Returns:
The GCS uris or file name.
"""
uris = []
storage_client = storage.Client(
project=project_id,
credentials=_GetClientCredentials(impersonated_service_account))
blobs = storage_client.list_blobs(bucket, prefix=folder_name)
for blob in blobs:
# Blobs ending in slashes are actually directory paths.
if not blob.name.endswith('/'):
# Redaction Error: 0.5MB size
if blob.size <= 1e7:
if uri:
uris.append(_GetGcsUri(bucket, blob.name))
else:
uris.append(blob.name)
return uris
def _GetClientCredentials(impersonated_service_account):
"""Gets client credentials for GCP requests.
If an account to impersonate is provided, then it will be used rather than the
default. Otherwise, default gcloud credentials will be used.
Args:
impersonated_service_account: The service account to impersonate.
Returns:
A credential for requests to GCP.
"""
creds, _ = google.auth.default(
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
if not impersonated_service_account:
return creds
target_scopes = ['https://www.googleapis.com/auth/cloud-platform']
target_credentials = impersonated_credentials.Credentials(
source_credentials=creds,
target_principal=impersonated_service_account,
target_scopes=target_scopes,
lifetime=3600,
)
# Override the source credentials so refresh will work.
# See https://github.com/googleapis/google-auth-library-python/issues/416.
# pylint: disable=protected-access
target_credentials._source_credentials._scopes = creds.scopes
return target_credentials
def _GetOauthToken(impersonated_service_account):
"""Gets an oauth token to use for HTTP requests to GCP.
Assumes usage of Gcloud credentials.
Args:
impersonated_service_account: The service account to impersonate.
Returns:
The oauth token.
"""
creds = _GetClientCredentials(impersonated_service_account)
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
return creds.token
# Needs modification based on customer's XML structure/fields
def _GetMetaData(file):
"""Parse XML file to get metadata.
Args:
file: XML file name
Returns:
data: Dictionary consisting of metadata
"""
tree = ET.parse(file)
root = tree.getroot()
data = {}
data['duration'] = str(root[0][1].text)
data['agent_id'] = str(root[0][6].text)
data['channel'] = str(root[0][8].text)
data['sid'] = str(root[0][10].text)
data['dbs'] = str(root[0][11].text)
data['no_of_holds'] = str(root[0][13].text)
data['hold_time'] = str(root[0][16].text)
return data
def _GetMetaDataFromTranscription(file):
"""Parse JSON transcription file to get metadata.
Args:
file: JSON file name
Returns:
data: Dictionary consisting of metadata
"""
with open(file) as transcription_file:
transcription = json.load(transcription_file)
print(transcription['conversation_info'])
meta_data = {}
meta_data['ucid'] = transcription['conversation_info']['ucid']
meta_data['agent_login_id'] = transcription['conversation_info']\
['agent_login_id']
meta_data['dt_skey'] = transcription['conversation_info']['dt_skey']
meta_data['src_scope'] = transcription['conversation_info']['src_scope']
meta_data['src_divsn'] = transcription['conversation_info']['src_divsn']
meta_data['src_sub_divsn'] = transcription['conversation_info']\
['src_sub_divsn']
return meta_data
def _GetTranscribeAsyncCallback(
project_id,
dest_bucket,
audio_uri,
insights_endpoint,
api_version,
should_redact,
agent_id,
conversation_names,
impersonated_service_account,
agent_channel,
xml_bucket,
):
"""A callback for asynchronous transcription.
Args:
project_id: The GCP project ID (not number) to hold the Insights
conversation object.
dest_bucket: The destination GCS bucket for the transcript file.
audio_uri: The uri of the audio file that was transcribed.
insights_endpoint: The endpoint for the Insights API.
api_version: The Insights API version to use.
should_redact: Whether to redact the transcription before saving.
agent_id: An agent identifier to attach to the conversations.
conversation_names: The list of conversation IDs created.
impersonated_service_account: The service account to impersonate.
agent_channel: The Agent track.
xml_bucket: Bucket where XML files present for MetaData ingestion.
Returns:
The callback for asynchronous transcription.
"""
def Callback(future):
try:
response = future.result()
# print('response generated')
except google.api_core.exceptions.GoogleAPICallError as e:
print(
'Error `{}`: failed to transcribe audio uri `{}` with operation `{}`'
' and error `{}`.'.format(
e, audio_uri, future.operation.name, future.exception()
)
)
return
# time.sleep(1)
try:
if should_redact == 'True':
# print('redacting')
response = _Redact(response, project_id, impersonated_service_account)
except google.api_core.exceptions.GoogleAPICallError as e:
print(
'Error `{}`: failed to redact transcription from audio uri `{}`.'
.format(e, audio_uri)
)
return
# print('After redacting')
# time.sleep(1)
transcript_name = '{}.txt'.format(
os.path.basename(os.path.splitext(audio_uri)[0])
)
try:
_UploadTranscript(
response,
dest_bucket,
transcript_name,
project_id,
impersonated_service_account,
)
# print('uploaded transcript')
except google.api_core.exceptions.GoogleAPICallError as e:
print(
'Error `{}`: failed to upload transcription from audio uri `{}`.'
.format(e, audio_uri)
)
return
# time.sleep(1)
transcript_uri = _GetGcsUri(dest_bucket, transcript_name)
try:
# print('transcript_uri',transcript_uri)
meta_data = {}
if xml_bucket != 'None':
tname = transcript_name.replace('.txt', '.xml')
xml_file_path = f'/tmp/{tname}' # f'{xml_path}/{tname}'
_DownloadFileFromGcs(
xml_bucket,
tname,
xml_file_path,
project_id,
impersonated_service_account,
)
meta_data = _GetMetaData(xml_file_path)
# print("meta_data",meta_data)
os.remove(xml_file_path)
elif agent_id != 'None':
meta_data['agent_id'] = agent_id
conversation_name = _CreateInsightsConversation(
insights_endpoint,
api_version,
project_id,
audio_uri,
transcript_uri,
meta_data,
impersonated_service_account,
medium=None,
agent_channel=agent_channel,
)
# print('conversation_name',conversation_name)
# Note: Appending to a python list is thread-safe, hence no lock.
conversation_names.append(conversation_name)
except requests.exceptions.HTTPError as e:
print(
'Error `{}`: failed to create insights conversation from audio uri '
'{} and transcript uri `{}`.'.format(e, audio_uri, transcript_uri)
)
return Callback
def _AddRedactedFolderToGcsUri(gcs_uri):
"""Create an intermediate path for redacted files.
Args:
gcs_uri: GCS uri where the transcripts are stored
Returns:
Modified uri with redacted folder added before the file name
"""
parsed_url = urlparse(gcs_uri)
path_parts = parsed_url.path.split('/')
path_parts.insert(1, 'redacted')
modified_path = '/'.join(path_parts)
modified_url = parsed_url._replace(path=modified_path)
return urlunparse(modified_url)
def _ImportConversationsFromTranscript(
transcript_uris,
project_id,
medium,
insights_endpoint,
api_version,
should_redact,
agent_id,
impersonated_service_account,
agent_channel,
xml_bucket,
transcript_bucket,
transcript_metadata_flag
):
"""Create conversations in Insights for a list of transcript uris.
Args:
transcript_uris: The transcript uris for which conversations should be
created.
project_id: The project ID (not number) to use for redaction and Insights.
medium: The medium (1 for voice, 2 for chat).
insights_endpoint: The Insights endpoint to send requests.
api_version: The Insights API version to use.
should_redact: Whether to redact transcriptions with DLP.
agent_id: An agent identifier to attach to the conversations.
impersonated_service_account: The service account to impersonate.
agent_channel: channel of the agent
xml_bucket: xml bucket for metadata
transcript_bucket: bucket where transcript is stored
transcript_metadata_flag: flag that specifies if metadata is present
inside transcript
Returns:
A list of conversations IDs for the created conversations.
"""
conversation_names = []
for transcript_uri in transcript_uris:
meta_data = {}
metadata_processed = False
if should_redact == 'True':
file_name = "/".join(transcript_uri.split("/")[3:])
json_file_path = f'/tmp/{file_name.split("/")[-1]}'
_DownloadFileFromGcs(
transcript_bucket,
file_name,
json_file_path,
project_id,
impersonated_service_account
)
with open(json_file_path) as file:
transcription = file.read()
redacted_string = _RedactTranscript(transcription, project_id,
impersonated_service_account)
#type(redacted_string)
redacted_json_file_path = f'/tmp/{"redacted_"+file_name.split("/")[-1]}'
redacted_string = json.dumps(redacted_string, indent=None)
with open(redacted_json_file_path, "w") as redacted_file:
redacted_file.write(redacted_string.replace('\\',""))
redacted_blob_path = "redacted/"+file_name
transcript_uri = "gs://"+transcript_bucket+"/"+redacted_blob_path
_UploadFileToGcs(transcript_bucket, redacted_json_file_path,
redacted_blob_path, project_id,
impersonated_service_account)
os.remove(redacted_json_file_path)
if transcript_metadata_flag == 'True':
meta_data = _GetMetaDataFromTranscription(json_file_path)
if agent_id != 'None':
meta_data['agent_id'] = agent_id
metadata_processed = True
os.remove(json_file_path)
if xml_bucket != 'None' and metadata_processed == False:
tname = transcript_uri.replace('.txt', '.xml')
xml_file_path = f'/tmp/{tname}'#f'{xml_path}/{tname}'
_DownloadFileFromGcs(
xml_bucket,
tname,
xml_file_path,
project_id,
impersonated_service_account
)
meta_data = _GetMetaData(xml_file_path)
os.remove(xml_file_path)
elif transcript_metadata_flag == 'True' and metadata_processed == False:
file_name = "/".join(transcript_uri.split("/")[3:])
json_file_path = f'/tmp/{file_name.split("/")[-1]}'
_DownloadFileFromGcs(
transcript_bucket,
file_name,
json_file_path,
project_id,
impersonated_service_account
)
meta_data = _GetMetaDataFromTranscription(json_file_path)
os.remove(json_file_path)
if agent_id != 'None':
meta_data['agent_id'] = agent_id
elif agent_id != 'None' and metadata_processed == False:
meta_data['agent_id'] = agent_id
conversation_name = _CreateInsightsConversation(
insights_endpoint,
api_version,
project_id,
None,
transcript_uri,
meta_data,
impersonated_service_account,
medium,
agent_channel=agent_channel,
)
conversation_names.append(conversation_name)
# Sleep to avoid exceeding Insights quota.
time.sleep(1)
return conversation_names
def _ImportConversationsFromAudio(
audio_uris,
encoding,
language_code,
sample_rate_hertz,
project_id,
dest_bucket,
insights_endpoint,
api_version,
should_redact,
agent_id,
impersonated_service_account,
agent_channel,
xml_bucket,
):
"""Create conversations in Insights for a list of audio uris.
Args:
audio_uris: The audio uris for which conversations should be created.
encoding: The language encoding for Speech-to-text.
language_code: The language code for Speech-to-text.
sample_rate_hertz: The sample rate of the audios.
project_id: The project ID (not number) to use for transcription,
redaction, and Insights.
dest_bucket: The destination GCS bucket for transcriptions.
insights_endpoint: The Insights endpoint to send requests.
api_version: The Insights API version to use.
should_redact: Whether to redact transcriptions with DLP.
agent_id: An agent identifier to attach to the conversations.
impersonated_service_account: The service account to impersonate.
agent_channel: The Agent Track.
xml_bucket: Bucket where XML files present for MetaData ingestion.
Returns:
A list of conversations IDs for the created conversations.
"""
conversation_names = []
operations = []
for audio_uri in audio_uris:
operation = _TranscribeAsync(
audio_uri,
encoding,
language_code,
sample_rate_hertz,
impersonated_service_account,
)
if not operation:
continue
operation.add_done_callback(
# pylint: disable=too-many-function-args
_GetTranscribeAsyncCallback(
project_id,
dest_bucket,
audio_uri,
insights_endpoint,
api_version,
should_redact,
agent_id,
conversation_names,
impersonated_service_account,
agent_channel,
xml_bucket,
)
)
operations.append(operation)
# Sleep to avoid exceeding Speech-to-text quota.
time.sleep(2)
num_operations = len(operations)
print(
'Successfully scheduled `{}` transcription operations.'.format(
num_operations
)
)
while operations:
operations = [operation for operation in operations if not operation.done()]
if not operations:
break
print(
'Still waiting for `{}` transcription operations to complete'.format(
len(operations)
)
)
# Sleep to avoid exceeding Speech-to-text quota.
time.sleep(5)
print(
'Finished waiting for all `{}` transcription operations.'.format(
num_operations
)
)
return conversation_names
def _AnalyzeConversations(
conversation_names,
insights_endpoint,
api_version,
impersonated_service_account,
):
"""Analyzes the provided list of conversations.
Args:
conversation_names: The list of conversations to analyze.
insights_endpoint: The Insights endpoint to call.
api_version: The Insights API version to use.
impersonated_service_account: The service account to impersonate.
"""
analysis_operations = []
oauth_token = _GetOauthToken(impersonated_service_account)
headers = {
'charset': 'utf-8',
'Content-type': 'application/json',
'Authorization': 'Bearer {}'.format(oauth_token),
}
for conversation_name in conversation_names:
try:
url = 'https://{}/{}/{}/analyses'.format(
insights_endpoint, api_version, conversation_name
)
r = requests.post(url, headers=headers)
print('Started analysis for conversation `{}`'.format(conversation_name))
# Sleep to avoid exceeding Create Analysis quota in Insights.
time.sleep(2)
if r.status_code == requests.codes.ok:
analysis_operations.append(r.json()['name'])
else:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
print(
'Error `{}`: failed to create analysis for conversation `{}`.'.format(
e, conversation_name
)
)
print(
'Successfully scheduled `{}` analysis operations: {}'.format(
len(analysis_operations), analysis_operations
)
)
finished_operations = []
while len(finished_operations) < len(analysis_operations):
for analysis_operation in analysis_operations:
try:
url = 'https://{}/{}/{}'.format(
insights_endpoint, api_version, analysis_operation
)
r = requests.get(url, headers=headers)
if r.status_code == requests.codes.ok:
json = r.json()
else:
json = {}
if 'done' in json and json.get('done', None):
finished_operations.append(analysis_operation)
else:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
print(
'Error `{}`: failed to poll analysis operation `{}`.'.format(
e, analysis_operation
)
)
time.sleep(1)
time.sleep(5)
print(
'All `{}` analysis operations have finished.'.format(
len(analysis_operations)
)
)
def _GetProcessedTranscripts(project_id):
"""Returns transcripts uri which are processed.
Args:
project_id: GCP Project Id
Returns:
transcripts: Transcripts uri which are processed
"""
transcripts = []
request = contact_center_insights_v1.ListConversationsRequest(
parent=f'projects/{project_id}/locations/us-central1',
)
client = contact_center_insights_v1.ContactCenterInsightsClient()
page_result = client.list_conversations(request=request)
for response in page_result:
# print(response.data_source.gcs_source.transcript_uri)
transcript = response.data_source.gcs_source.transcript_uri
if transcript not in transcripts:
transcripts.append(response.data_source.gcs_source.transcript_uri)
return transcripts
def _RemoveProcessedFiles(audio_uris, processed_audio_files):
"""Removes the processed files from audio_uris.
Args:
audio_uris: Audio files to process
processed_audio_files: Processed audio files names
Returns:
unprocessed_audio_uris: Files which are left to process
"""
unprocessed_audio_uris = []
for file in audio_uris:
if file.split('/')[-1].split('.')[0] not in processed_audio_files:
unprocessed_audio_uris.append(file)
return unprocessed_audio_uris
def main():
pargs = _ParseArgs()
project_id = pargs.project_id
impersonated_service_account = pargs.impersonated_service_account
insights_endpoint = pargs.insights_endpoint
api_version = pargs.insights_api_version
should_redact = pargs.redact
agent_id = pargs.agent_id
agent_channel = int(pargs.agent_channel)
xml_bucket = pargs.xml_gcs_bucket
analyze_conv = pargs.analyze
folder_name = pargs.folder_name
if pargs.source_local_audio_path or pargs.source_audio_gcs_bucket:
# Inputs are audio files.
dest_bucket = pargs.dest_gcs_bucket
if pargs.source_local_audio_path:
source_local_audio_path = pargs.source_local_audio_path
source_audio_base_name = os.path.basename(source_local_audio_path)
_UploadFileToGcs(
dest_bucket,
source_local_audio_path,
source_audio_base_name,
project_id,
impersonated_service_account,
)
audio_uris = [_GetGcsUri(dest_bucket, source_audio_base_name)]
elif pargs.source_audio_gcs_bucket:
audio_uris = _GetGcsUris(
pargs.source_audio_gcs_bucket,
project_id,
impersonated_service_account,
)
# print(audio_uris)
else:
audio_uris = []
encoding = pargs.encoding
language_code = pargs.language_code
sample_rate_hertz = pargs.sample_rate_hertz
# get processed audio files
processed_audio_files = _GetGcsUris(
dest_bucket, project_id, impersonated_service_account, uri=False
)
processed_audio_files = list(
map(lambda x: x.split('.')[0], processed_audio_files)
)
# remove processed files
audio_uris = _RemoveProcessedFiles(audio_uris, processed_audio_files)
if not audio_uris:
print('No audio file to process')
conversation_names = []
else:
conversation_names = _ImportConversationsFromAudio(
audio_uris,
encoding,
language_code,
sample_rate_hertz,
project_id,
dest_bucket,
insights_endpoint,
api_version,
should_redact,
agent_id,
impersonated_service_account,
agent_channel,
xml_bucket,
)
else:
# Inputs are transcript files.
if pargs.source_voice_transcript_gcs_bucket:
medium = 1
print(pargs.source_voice_transcript_gcs_bucket)
transcript_bucket = pargs.source_voice_transcript_gcs_bucket
elif pargs.source_chat_transcript_gcs_bucket:
print(pargs.source_chat_transcript_gcs_bucket)
transcript_bucket = pargs.source_chat_transcript_gcs_bucket
medium = 2
else:
print('Provide at least one bucket for (Audio/Chat)')
return
transcript_metadata_flag = pargs.transcript_metadata_flag
transcript_uris = _GetGcsUris(transcript_bucket, project_id,
impersonated_service_account, folder_name)
# get processed transcripts
processed_transcripts_uris = _GetProcessedTranscripts(project_id)
unprocessed_transcript_uris = []
# filter processed transcripts
if should_redact == 'True':
for transcript_uri in transcript_uris:
redacted_uri = _AddRedactedFolderToGcsUri(transcript_uri)
if redacted_uri not in processed_transcripts_uris:
unprocessed_transcript_uris.append(transcript_uri)
else:
unprocessed_transcript_uris = [
i for i in transcript_uris if i not in processed_transcripts_uris
]
print('Total transcripts', len(unprocessed_transcript_uris))
with open('transcripts_uris', 'wb') as fp:
pickle.dump(transcript_uris, fp)
if not transcript_uris:
print('No transcript to ingest')
conversation_names = []
else:
conversation_names = _ImportConversationsFromTranscript(
unprocessed_transcript_uris, project_id, medium, insights_endpoint,
api_version, should_redact, agent_id,
impersonated_service_account, agent_channel, xml_bucket,
transcript_bucket, transcript_metadata_flag)
print(
'Created `{}` conversation IDs: {}'.format(
len(conversation_names), conversation_names
)
)
if analyze_conv == 'True':
print('Starting analysis for conversations.')
_AnalyzeConversations(
conversation_names,
insights_endpoint,
api_version,
impersonated_service_account,
)
if __name__ == '__main__':
main()