infra-as-code/modules/ingest-pipeline/cf-feedback-generator/lib.py (198 lines of code) (raw):

import os import json import google.auth import google.auth.transport.requests import requests import vertexai import google.cloud.logging from vertexai.generative_models import GenerativeModel, GenerationConfig from google.cloud import storage from google.cloud import bigquery from datetime import datetime from record import RecordKeeper class CoachingFeedbackGenerator: def __init__(self, project_id, location_id, model_name, insights_endpoint, insights_api_version, ccai_insights_location_id, conversation_id, dataset_name, table_name, scorecard_id, ingest_record_bucket_id, target_tags, target_values): self.project_id = project_id self.location_id = location_id self.model_name = model_name self.insights_endpoint = insights_endpoint self.insights_api_version = insights_api_version self.ccai_insights_location_id = ccai_insights_location_id self.conversation_id = conversation_id self.creds = self.get_credentials() self.oauth_token = self.get_oauth_token() vertexai.init(project=self.project_id, location=self.location_id, credentials=self.creds) self.model = GenerativeModel(model_name) self.dataset_name = dataset_name self.table_name = table_name self.scorecard_id = scorecard_id self.ingest_record_bucket_id = ingest_record_bucket_id self.target_tags = target_tags.split(",") self.target_values = target_values.split(",") self.storage_client = storage.Client(project=self.project_id, credentials=self.creds) def get_credentials(self): creds, _ = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform']) return creds def get_oauth_token(self): auth_req = google.auth.transport.requests.Request() self.creds.refresh(auth_req) return self.creds.token def get_transcript(self): """Calls the CCAI endpoint to download the transcript Returns: dict: CCAI stored transcript """ ccai_conversation_url = ( 'https://{}/{}/projects/{}/locations/{}/conversations/{}' ).format(self.insights_endpoint, self.insights_api_version, self.project_id, self.ccai_insights_location_id, self.conversation_id) headers = { 'charset': 'utf-8', 'Content-type': 'application/json', 'Authorization': 'Bearer {}'.format(self.oauth_token), } r = requests.get(ccai_conversation_url, headers=headers) return json.loads(r.text) def log_error(self, exception_message): """Logs an error in Cloud Logging Args: exception_message (str): Exception error message """ creds = self.creds client = google.cloud.logging.Client(project = self.project_id, credentials = creds) logger = client.logger(name="cf_feedback_generator_logger") entry = dict() entry['message'] = 'An error ocurred when using GenAI to generate coaching feedback' entry['exception_message'] = exception_message entry['conversation_id'] = self.conversation_id entry['ccai_insights_endpoint'] = self.insights_endpoint entry['ccai_insights_version'] = self.insights_api_version entry['ccai_insights_location'] = self.ccai_insights_location_id logger.log_struct(entry,severity="ERROR",) print('Error logged') def get_latest_revision(self): """Gets the latests version of the scorecard for QAI Returns: str: version id """ try: url = ( 'https://{}/{}/projects/{}/locations/{}/qaScorecards/{}/revisions' ).format(self.insights_endpoint, self.insights_api_version, self.project_id, self.ccai_insights_location_id, self.scorecard_id) headers = { 'charset': 'utf-8', 'Content-type': 'application/json', 'Authorization': 'Bearer {}'.format(self.oauth_token), } r = requests.get(url, headers=headers) response_text = json.loads(r.text) most_recent_revision = max( response_text['qaScorecardRevisions'], key=lambda x: datetime.fromisoformat(x['createTime'].replace('Z', '+00:00')) ) return most_recent_revision['name'].split("/")[-1] except Exception as e: print("An error occurred while requesting the latest revision ID: {}".format(e)) def get_qa_questions(self): """Gets all the questions from the scorecard Returns: dict: dictionary with the questions as key and the instructions as values """ latest_revision_id = self.get_latest_revision() ccai_conversation_url = ( 'https://{}/{}/projects/{}/locations/{}/qaScorecards/{}/revisions/{}/qaQuestions' ).format(self.insights_endpoint, self.insights_api_version, self.project_id, self.ccai_insights_location_id, self.scorecard_id, latest_revision_id) headers = { 'charset': 'utf-8', 'Content-type': 'application/json', 'Authorization': 'Bearer {}'.format(self.oauth_token), } r = requests.get(ccai_conversation_url, headers=headers) response_text = json.loads(r.text) qaQuestions = dict() for question in response_text['qaQuestions']: qaQuestions[question['questionBody']] = {"instructions": f"{question['answerInstructions']}"} return qaQuestions def extract_questions(self, qa_questions): """ Extracts questions from a CCAI conversation based on provided criteria. Args: qa_questions (dict): Dictionary with the questions as key and the instructions as values. target_tags (list): List of tags to filter questions. target_values (list): List of answer values to exclude. Returns: Tuple(dict, list): Dictionary of feedback structure and list of questions. """ questions_feedback = dict() questions_list = [] results = [] for question in qa_questions: if any(tag in question['tags'] for tag in self.target_tags): if 'naValue' in question['answerValue']: continue if question['answerValue']['boolValue'] not in self.target_values: question_content = { 'conversation_id': question['conversation'], 'question_id': question['qaQuestion'], 'question': question['questionBody'], 'feedback': None } results.append(question_content) questions_list.append(question['questionBody']) questions_feedback['results'] = results return questions_feedback, questions_list def generate_prompt(self, transcript, subset_questions, questions_feedback): """From the CCAI conversation it extracts the QAI evaluation scores where it is only Yes (1) or No (0) Args: transcript (dict): dictionary with the transcript from CCAI subset_questions (dict): dictionary with the questions and instructions questions_feedback (dict): dictionary with the format of the feedback output Returns: Tuple(dict, list): dictionary of feedback structure and list of questions """ prompt = f""" <OBJECTIVE_AND_PERSONA> You are a customer service coach specializing in support programs Your primary role is to analyze customer interactions (provided as text transcripts) and provide constructive feedback to agents. Your goal is to help agents improve their communication skills, performance and adherence to company policies while maintaining a supportive and motivating tone. </OBJECTIVE_AND_PERSONA> <INSTRUCTIONS> You will receive: - A redacted Transcript in JSON format - Coaching Questions (rubric) in JSON format Your job is to: 1. Carefully read and analyze the entire transcript. 2. For each coaching question: - Review the specific instructions and use the given rubric as base to generate the coaching feedback. - Assume the agent did not meet the criteria (received a negative score). - Identify specific moments in the transcript where improvement is needed. 3. Create comprehensive coaching feedback for each question that: - Identifies the specific gap between current performance and expected standards. - Provides clear examples from the transcript of what could have been done differently. - Offers step-by-step guidance on how to improve When giving feedback: - **Be specific**: Focus on particular interactions or phrases that need improvement. - ** Be relevant**: Focus on the context of the call and offer suggest improvments that can improve the caller experience - **Be constructive**: Offer actionable suggestions that the agent can implement immediately to improve their performance. - **Focus on improvement**: Highlight patterns of behavior and suggest practical training or practice techniques where necessary. - **Be short and to the point**: Provide the feedback in not more than 2 sentences per question which should inolve all the needed information in a clear manner 4. Preserve the original Feedback JSON structure, including all key-value pairs, nesting, and formatting. Only modify the "feedback" key to add the feedback. Do not change "question", "question_id". </INSTRUCTIONS> <CONTEXT> Conversation topics: - Welcome call (patient onboarding): To welcome and onboard the new patient to the Program - Coverage determination communications : To provide the patient with information about their insurance coverage determination - Device training : To be used to preview the device training program for patients already on therapy - Restart : To re-enroll eligible patients back into the Program </CONTEXT> <INPUTS> Transcript: {transcript} Coaching Questions: {subset_questions} </INPUTS> <OUTPUT> Feedback: {questions_feedback} </OUTPUT> <VALIDATION> Before providing any response: 1. Verify the request aligns with coaching mission 2. Ensure response meets all formatting requirements 3. Confirm feedback addresses score improvement 4. Ensure feedback is relevant to the context of the call 5.If unable to meet these criteria, respond with "I am not able to answer this question" </VALIDATION> """ return prompt def generate_coaching_feedback(self, prompt, response_schema): """Calls VertexAI to get a response for coaching feedback Args: prompt (str): prompt to use with Gemini response_schema (dict): dictionary with the expected schema output Returns: dict: dictionary with the gemini response """ contents = [prompt] response = self.model.generate_content(contents, generation_config=GenerationConfig( temperature=0, response_mime_type="application/json", response_schema=response_schema) ) print(f"respuesta a generate_coaching_feedback {response}") return json.loads(response.text) def insert_feedback_to_bigquery(self, json_data): """Inserts the feedback generated in BigQuery Args: json_data (dict): dictionary with the gemini response """ client = bigquery.Client(project=self.project_id) table_ref = client.dataset(self.dataset_name).table(self.table_name) table = client.get_table(table_ref) # Convert JSON to rows format expected by BigQuery rows_to_insert = [{ 'conversationName': json_data['conversation_id'], 'qaQuestion': json_data['question_id'], 'feedback': json_data['feedback'] }] try: errors = client.insert_rows(table, rows_to_insert) if errors == []: print('Data inserted successfully') else: print('Errors occurred while inserting data:', errors) except Exception as e: print(f'Error occurred: {str(e)}') return def run(self): """Runs the functions to get the transcript, questions and instructions to prompt gemini Returns: dict: coaching feedback """ conversation = self.get_transcript() original_file_name = conversation['labels']['original_file_name'] self.record_keeper = RecordKeeper(self.ingest_record_bucket_id, original_file_name, self.storage_client) try: transcript = conversation['transcript']['transcriptSegments'] qa_questions_dict = conversation['latestAnalysis']['analysisResult']['callAnalysisMetadata']['qaScorecardResults'][0]['qaAnswers'] print(qa_questions_dict) questions_feedback, qa_questions_list = self.extract_questions(qa_questions_dict) scorecard = self.get_qa_questions() subset_questions = {key: scorecard[key] for key in qa_questions_list if key in scorecard} response_schema = { "type": "object", "properties": { "results": { "type": "array", "items": { "type": "object", "properties": { "conversation_id": {"type": "string"}, "question_id": {"type": "string"}, "question": {"type": "string"}, "feedback": {"type": "string"} }, "required": ["question_id", "question", "feedback"] }, "required": ["items"] } }, "required": ["results"] } prompt = self.generate_prompt(transcript, subset_questions, questions_feedback) coaching = self.generate_coaching_feedback(prompt, response_schema) print(questions_feedback) print(qa_questions_list) print(coaching['results']) for question_feedback in coaching['results']: self.insert_feedback_to_bigquery(question_feedback) self.record_keeper.replace_row(self.record_keeper.create_processed_record()) print(f"respuesta a generate_coaching_feedback {coaching}") return coaching except Exception as e: self.log_error(str(e)) self.record_keeper.replace_row( self.record_keeper.create_error_record(f'An error ocurred when using GenAI to generate coaching feedback: {str(e)}'))