in infra-as-code/modules/ingest-pipeline/cf-ccai-conversation-upload/insights_uploader.py [0:0]
def upload_insights_conversation(self, gcs_transcript_uri, metadata, gcs_audio_uri):
"""
Uploads a conversation to Contact Center AI Insights.
Args:
gcs_transcript_uri (str): The GCS URI of the transcript file.
metadata (dict): Metadata associated with the conversation.
gcs_audio_uri (str): The GCS URI of the audio file.
Returns:
str: The operation name for the upload request.
Raises:
Exception: If an error occurs during the upload.
"""
upload_conversation_url = (
'https://{}/{}/projects/{}/locations/{}/conversations:upload'
).format(self.insights_endpoint, self.insights_api_version, self.ccai_insights_project_id, self.ccai_insights_location_id)
headers = {
'charset': 'utf-8',
'Content-type': 'application/json',
'Authorization': 'Bearer {}'.format(self.oauth_token),
}
data = dict()
source = {'data_source': {'gcs_source': {'transcript_uri': gcs_transcript_uri, 'audio_uri': gcs_audio_uri} } }
data['conversation'] = metadata | source if metadata else source
data['conversation']['call_metadata'] = {'agent_channel':1, 'customer_channel': 2}
data['conversationId'] = gcs_audio_uri.split('/')[-1].replace('.flac', '')
r = requests.post(upload_conversation_url, headers=headers, json=data)
if r.status_code == requests.codes.ok:
print('Status ok')
operation = r.json()
operation_name = r.json()['name']
return operation_name
else:
print('Status not ok')
try:
r.raise_for_status()
except requests.exceptions.RequestException as e:
print(f'An error occurred running the CCAI Insights upload conversation operation: {e.response.text}')
raise Exception(f'{e.response.text}')