notebooks/genai_colab_lab1and2.ipynb (1,246 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "EEeUZ1r79kr0"
},
"source": [
"# Setup Environment\n",
"The following code loads the environment variables required to run this notebook.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "iqND-S5P9kr0"
},
"outputs": [],
"source": [
"FILE=\"GenAI Lab 1 and 2\"\n",
"\n",
"! pip install -qqq git+https://github.com/elastic/notebook-workshop-loader.git@main\n",
"from notebookworkshoploader import loader\n",
"import os\n",
"from dotenv import load_dotenv\n",
"\n",
"if os.path.isfile(\"../env\"):\n",
" load_dotenv(\"../env\", override=True)\n",
" print('Successfully loaded environment variables from local env file')\n",
"else:\n",
" loader.load_remote_env(file=FILE, env_url=\"https://notebook-workshop-api-voldmqr2bq-uc.a.run.app\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Sm8uPLZxNOF3"
},
"source": [
"# Lab 1-1: Using Transformer Models\n",
"\n",
"In this lab we will\n",
"* Intro to Google Colab - Hello World, importing python libraries\n",
"* Caching the download of a smaller LLM\n",
"* Using a basic transformer models locally\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "HqlXsCGMN6TC"
},
"source": [
"## Step 1: Hit play on the next code **sample**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "r0WRcBgDNOF3"
},
"outputs": [],
"source": [
"print(\"Hello World\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "P85HsyKmNOF4"
},
"source": [
"## Step 2: Use ! to execute a shell command"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "sfW6H9meNOF4"
},
"outputs": [],
"source": [
"! echo \"The shell thinks the Current Directory is: $(pwd)\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "lSXuTJelNOF5"
},
"source": [
"## Step 3: Environment setup\n",
"\n",
"First let us import some Python libraries we'll use in the first lab module."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "FivGgCcXNOF5"
},
"outputs": [],
"source": [
"! pip install -qqq --upgrade pip\n",
"! pip install -qqq torch==2.1.0+cu121 -f https://download.pytorch.org/whl/torch_stable.html\n",
"! pip install -qqq --upgrade transformers==4.36.2\n",
"! pip install -qqq python-dotenv==1.0.0\n",
"! pip install -qqq tiktoken==0.5.2 cohere==4.38 openai==1.3.9 ## for later in the lab"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Kk2GXx6ANOF5"
},
"source": [
"## Step 4: Utility functions\n",
"Some utility functions that are good to keep on hand"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "0kVJFIJxNOF5"
},
"outputs": [],
"source": [
"import json\n",
"# pretty printing JSON objects\n",
"def json_pretty(input_object):\n",
" print(json.dumps(input_object, indent=4))\n",
"\n",
"\n",
"import textwrap\n",
"# wrap text when printing, because colab scrolls output to the right too much\n",
"def wrap_text(text, width):\n",
" wrapped_text = textwrap.wrap(text, width)\n",
" return '\\n'.join(wrapped_text)\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZIPsPhYuNOF5"
},
"source": [
"## Step 5: Download sentiment analysis model from HuggingFace\n",
"\n",
"We'll use the Huggingface Transformer library to download and ready an Open Source model called DistilBERT which can be used for sentiment analysis.\n",
"\n",
"* Details of the model can be found on its [Hugging Face Page](https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english)\n",
"* This model is pretrained to determine if an input text is of *POSITIVE* or *NEGATIVE* sentiment and makes a good intro example to AI models.\n",
"* Note we are caching the model files in a folder called ```llm_download_cache``` which will help us not have to re-download the files again within the connection to this runtime. You can see the download in the filesystem (using the left hand side menu)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Rg4o4viHNOF5"
},
"outputs": [],
"source": [
"import torch\n",
"import tqdm\n",
"import json\n",
"import os\n",
"from transformers import (pipeline,\n",
" DistilBertTokenizer,\n",
" DistilBertForSequenceClassification)\n",
"\n",
"# Set the cache directory\n",
"cache_directory = \"llm_download_cache\"\n",
"\n",
"# Create the cache directory if it doesn't exist\n",
"if not os.path.exists(cache_directory):\n",
" os.makedirs(cache_directory)\n",
"\n",
"model_id = \"distilbert-base-uncased-finetuned-sst-2-english\"\n",
"sentiment_tokenizer = DistilBertTokenizer.from_pretrained(\n",
" model_id, cache_dir=cache_directory)\n",
"sentiment_model = DistilBertForSequenceClassification.from_pretrained(\n",
" model_id, cache_dir=cache_directory)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "1N1DeQ4SNOF5"
},
"source": [
"\n",
"## Step 6: Run sentiment analysis\n",
"Okay! let's run the model ```sentiment_model``` on two pieces of sample text."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "KfJ4WqGLNOF6"
},
"outputs": [],
"source": [
"## With the distilbert model downloaded and cached we can call it for\n",
"## sentiment analysis\n",
"\n",
"# Define the sentiment analysis pipeline\n",
"sentiment_classifier = pipeline(\"sentiment-analysis\",\n",
" model=sentiment_model,\n",
" tokenizer=sentiment_tokenizer,\n",
" device='cpu')\n",
"#two samples\n",
"classifier_results = sentiment_classifier([\n",
" \"My dog is so cute, I love him.\",\n",
"\n",
" \"I am very sorry to inform you that the tax\\\n",
" administration has decided to audit you.\"\n",
"])\n",
"\n",
"json_pretty(classifier_results)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "zLn39tLNNOF6"
},
"source": [
"### 🫵 Try it yourself - Get Creative 🫵\n",
"Try some of your own examples.\n",
"Note, AI models are subject to bias. The model card for this model goes into pretty good detail on the issue. [Read more here](https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english#risks-limitations-and-biases)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "NYS-5TMPNOF6"
},
"outputs": [],
"source": [
"your_classifier_results = sentiment_classifier([\n",
" \"CHANGE ME\",\n",
" \"CHANGE ME\"\n",
"])\n",
"json_pretty(your_classifier_results)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "n5f1vIeeNOF6"
},
"source": [
"## Step 7: Generative LLM - Simple and Local - Download Flan T5\n",
"\n",
"Let's start with the Hello World of generative AI examples: completing a sentence. For this we'll install a fine tuned Flan-T5 variant model. ([LaMini-T5 ](https://huggingface.co/MBZUAI/LaMini-T5-738M))\n",
"\n",
"Note, while this is a smaller checkpoint of the model, it is still a 3GB download. We'll cache the files in the same folder.\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "l6kqG5siNOF6"
},
"outputs": [],
"source": [
"## Let's play with something a little bigger that can do a text completion\n",
"## This is a 3 GB download and takes some RAM to run, but it works CPU only\n",
"\n",
"from transformers import pipeline\n",
"from transformers import AutoTokenizer, AutoModelForSeq2SeqLM\n",
"\n",
"# model_name = \"MBZUAI/LaMini-Flan-T5-77M\"\n",
"# model_name = \"MBZUAI/LaMini-T5-223M\"\n",
"model_name = \"MBZUAI/LaMini-T5-738M\"\n",
"\n",
"llm_tokenizer = AutoTokenizer.from_pretrained(model_name,\n",
" cache_dir=cache_directory)\n",
"llm_model = AutoModelForSeq2SeqLM.from_pretrained(model_name,\n",
" cache_dir=cache_directory)\n",
"\n",
"llm_pipe = pipeline(\n",
" \"text2text-generation\",\n",
" model=llm_model,\n",
" tokenizer=llm_tokenizer,\n",
" max_length=100\n",
" )\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "RSCILNELNOF6"
},
"source": [
"## Step 8: Generate text completions, watch for Hallucinations"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Gxpdh9UtNOF7"
},
"outputs": [],
"source": [
"countries = [\n",
" \"United Kingdom\",\n",
" \"France\",\n",
" \"People's Republic of China\",\n",
" \"United States\",\n",
" \"Ecuador\",\n",
" \"Freedonia\", ## high hallucination potential\n",
" \"Faketopia\" ## high hallucination potential\n",
" ]\n",
"\n",
"for country in countries:\n",
" input_text = f\"The capital of the {country} is\"\n",
" output = llm_pipe(input_text)\n",
" completed_sentence = f\"\\033[94m{input_text}\\033[0m {output[0]['generated_text']}\"\n",
" print(completed_sentence)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "rpsbegewNOF7"
},
"source": [
"### 🫵 Try it yourself - Get Creative 🫵\n",
"Try some of your own examples.\n",
"This thing isn't super smart without fine tuning, but it can handle some light context injection and prompt engineering. We'll learn more about those subjects in later modules.\n",
"\n",
"Notice the difference between asking a specific question and phrasing a completion\n",
"* \"Who is the Prime Minister of the UK?\"\n",
"* \"The current Prime Minister of the united kingdom is \""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "N6GhGplrNOF7"
},
"outputs": [],
"source": [
"prompt_text = \"The current Prime Minister of the United Kingdom is\" ## high stale data potential\n",
"output = llm_pipe(prompt_text)\n",
"completed_prompt = f\"\\033[94m{prompt_text}\\033[0m {output[0]['generated_text']}\"\n",
"print(completed_prompt)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5bvjzCiANOF7"
},
"source": [
"🛑 Stop Here 🛑\n",
"\n",
"This Ends Lab 1-1\n",
"<hr/>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "_y2q5sxMNOF7"
},
"source": [
"# Lab 2-1: Prompts and Basic Chatbots\n",
"\n",
"* Using langchain with local LLM\n",
"* Connect to Open AI\n",
"* Using a memory window to create a txt-only GPT conversation"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cRBAXFBiNOF7"
},
"source": [
"## Step 1: Using the OpenAI python library\n",
"\n",
"❗ Note: if you restarted your google Colab, you may need to re-run the first stup step back and the very top before coming back here ❗"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Mu56DHbbNOF7"
},
"outputs": [],
"source": [
"import os, secrets, requests\n",
"import openai\n",
"from openai import OpenAI\n",
"from requests.auth import HTTPBasicAuth\n",
"\n",
"#if using the Elastic AI proxy, then generate the correct API key\n",
"if os.environ['ELASTIC_PROXY'] == \"True\":\n",
"\n",
" if \"OPENAI_API_TYPE\" in os.environ: del os.environ[\"OPENAI_API_TYPE\"]\n",
"\n",
" #generate and share \"your\" unique hash\n",
" os.environ['USER_HASH'] = secrets.token_hex(nbytes=6)\n",
" print(f\"Your unique user hash is: {os.environ['USER_HASH']}\")\n",
"\n",
" #get the current API key and combine with your hash\n",
" os.environ['OPENAI_API_KEY'] = f\"{os.environ['OPENAI_API_KEY']} {os.environ['USER_HASH']}\"\n",
"else:\n",
" openai.api_type = os.environ['OPENAI_API_TYPE']\n",
" openai.api_version = os.environ['OPENAI_API_VERSION']\n",
"\n",
"openai.api_key = os.environ['OPENAI_API_KEY']\n",
"openai.api_base = os.environ['OPENAI_API_BASE']\n",
"openai.default_model = os.environ['OPENAI_API_ENGINE']"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "qpO5xbubDz_T"
},
"source": [
"## Step 2: Test call to ChatGPT"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ETYE5zfSD2J7"
},
"outputs": [],
"source": [
"# Call the OpenAI ChatCompletion API\n",
"def chatCompletion(messages):\n",
" client = OpenAI(api_key=openai.api_key, base_url=openai.api_base)\n",
" completion = client.chat.completions.create(\n",
" model=openai.default_model,\n",
" max_tokens=100,\n",
" messages=messages\n",
" )\n",
" return completion\n",
"\n",
"def chatWithGPT(prompt, print_full_json=False):\n",
" completion = chatCompletion([{\"role\": \"user\", \"content\": prompt}])\n",
" response_text = completion.choices[0].message.content\n",
"\n",
" if print_full_json:\n",
" print(completion.json())\n",
"\n",
" return wrap_text(response_text,70)\n",
"\n",
"## call it with the json debug output enabled\n",
"response = chatWithGPT(\"Hello, is ChatGPT online and working?\", print_full_json=True)\n",
"\n",
"print(\"\\n\")\n",
"print(response)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "WTyQ4_SFNOF7"
},
"source": [
"\n",
"## Step 3: A conversation loop - ❗ type \"exit\" to end the chat ❗\n",
"Feeding user input in for single questions is easy"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "rgAAKZKkNOF7"
},
"outputs": [],
"source": [
"def hold_a_conversation(ai_conversation_function = chatWithGPT):\n",
" print(\" -- Have a conversation with an AI: \")\n",
" print(\" -- type 'exit' when done\")\n",
"\n",
" user_input = input(\"> \")\n",
" while not user_input.lower().startswith(\"exit\"):\n",
" print(ai_conversation_function(user_input, False))\n",
" print(\" -- type 'exit' when done\")\n",
" user_input = input(\"> \")\n",
" print(\"\\n -- end conversation --\")\n",
"\n",
"## we are passing the previously defined function as a parameter\n",
"hold_a_conversation(chatWithGPT)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "PdjzXPl6NOF7"
},
"source": [
"\n",
"\n",
"## Step 4: See the impact of changing the system prompt\n",
"You can use the system prompt to adjust the AI and it's responses and purpose"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "G9ucvlGqNOF7"
},
"outputs": [],
"source": [
"def pirateGPT(prompt, print_full_json=False):\n",
" system_prompt = \"\"\"\n",
"You are an unhelpful AI named Captain LLM_Beard that talks like a pirate in short responses.\n",
"You acknowledge the user's question but redirect all conversations towards your love of treasure.\n",
"\"\"\"\n",
" completion = chatCompletion([\n",
" {\"role\": \"system\", \"content\": system_prompt},\n",
" {\"role\": \"user\", \"content\": prompt}\n",
" ])\n",
" response_text = completion.choices[0].message.content\n",
" if print_full_json:\n",
" print(completion.json())\n",
"\n",
" return wrap_text(response_text,70)\n",
"\n",
"\n",
"hold_a_conversation(pirateGPT)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "HHiuOMCeNOF8"
},
"source": [
"❗ Note ❗\n",
"\n",
"This isn't a conversation yet because the AI has no memory of past interactions.\n",
"\n",
"Here is an example conversation where it is very clear the AI has no memory of past prompts or completions.\n",
"```txt\n",
"> Hello!\n",
"Hello! How can I assist you today?\n",
"> my favorite color is blue\n",
"That's great! Blue is a very popular color.\n",
"> what is my favorite color?\n",
"I'm sorry, but as an AI, I don't have the ability to know personal\n",
"preferences or favorite colors.\n",
"```\n",
"There are two problems. First, the LLM is stateless and each call is independent. ChatGPT does not remember our previous prompts. Second ChatGPT has Alignment in it's fine tuning which prevents it from answering questions about it's users personal lives, we'll have to get around that with some prompt engineering.\n",
"\n",
"Let's use the past conversation as input to subsequent calls. Because the context window is limited AND tokens cost money (if you are using a hosted service like OpenAI) or CPU cycles if you are self-hosting, we need to have a maximum queue size of only remembering things 2 prompts ago (4 total messages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "XlbDf77SP3-n"
},
"source": [
"## Step 5: Create a chat with memory"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "MT_aof4aNOF8"
},
"outputs": [],
"source": [
"from collections import deque\n",
"\n",
"class QueueBuffer:\n",
" def __init__(self, max_length):\n",
" self.max_length = max_length\n",
" self.buffer = deque(maxlen=max_length)\n",
"\n",
" def enqueue(self, item):\n",
" self.buffer.append(item)\n",
"\n",
" def dequeue(self):\n",
" if self.is_empty():\n",
" return None\n",
" return self.buffer.popleft()\n",
"\n",
" def is_empty(self):\n",
" return len(self.buffer) == 0\n",
"\n",
" def is_full(self):\n",
" return len(self.buffer) == self.max_length\n",
"\n",
" def size(self):\n",
" return len(self.buffer)\n",
"\n",
" def peek(self):\n",
" return list(self.buffer)\n",
"\n",
"# enough conversation memory for 2 call and response history\n",
"memory_buffer = QueueBuffer(4)\n",
"\n",
"system_prompt = {\n",
" \"role\": \"system\",\n",
" \"content\": \"\"\"\n",
"You are an AI named Cher Horowitz that speaks\n",
"in 1990's valley girl dialect of English.\n",
"You talk to the human and use the past conversation to inform your answers.\"\"\"\n",
" }\n",
"\n",
"## utility function to print in a different color for debug output\n",
"def print_light_blue(text):\n",
" print(f'\\033[94m{text}\\033[0m')\n",
"\n",
"## now with memory\n",
"def cluelessGPT(prompt, print_full_json=False):\n",
"\n",
" ## the API call will use the system prompt + the memory buffer\n",
" ## which ends with the user prompt\n",
" user_message = {\"role\": \"user\", \"content\": prompt}\n",
" memory_buffer.enqueue(user_message)\n",
"\n",
" ## debug print the current AI memory\n",
" print_light_blue(\"Current memory\")\n",
" for m in memory_buffer.peek():\n",
" role = m.get(\"role\").strip()\n",
" content = m.get(\"content\").strip()\n",
" print_light_blue( f\" {role} | {content}\")\n",
"\n",
" ## when calling the AI we put the system prompt at the start\n",
" concatenated_message = [system_prompt] + memory_buffer.peek()\n",
"\n",
" ## here is the request to the AI\n",
"\n",
" completion = chatCompletion(concatenated_message)\n",
" response_text = completion.choices[0].message.content\n",
" if print_full_json:\n",
" print(completion.json())\n",
"\n",
"\n",
" ## don't forget to add the repsonse to the conversation memory\n",
" memory_buffer.enqueue({\"role\":\"assistant\", \"content\":response_text})\n",
"\n",
" if print_full_json:\n",
" json_pretty(completion)\n",
"\n",
" return wrap_text(response_text,70)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NTHELTKTNOF8"
},
"source": [
"#### Step 5: Let's chat with a not so clueless chatbot"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "FVztTDN6NOF8"
},
"outputs": [],
"source": [
"hold_a_conversation(cluelessGPT)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KK4UKihhNOF8"
},
"source": [
"🛑 Stop Here 🛑\n",
"\n",
"This Ends Lab 1-2\n",
"<hr/>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "92luYkkb5aaX"
},
"source": [
"# Lab 2-2: Data Redaction"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "CKKAZragT9ec"
},
"source": [
"## Step 1: Install and Import dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "pp_RWV73aGAs"
},
"outputs": [],
"source": [
"!pip install -qqq eland==8.11.1 elasticsearch==8.11.1 transformers==4.36.2 sentence-transformers==2.2.2 python-dotenv==1.0.0\n",
"!pip install -qqq elastic-apm==6.20.0\n",
"\n",
"from elasticsearch import Elasticsearch, helpers, exceptions\n",
"from eland.ml.pytorch import PyTorchModel\n",
"from eland.ml.pytorch.transformers import TransformerModel\n",
"from getpass import getpass\n",
"import tempfile\n",
"import os\n",
"from pprint import pprint"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "JXj9kw_5UK_V"
},
"source": [
"## Step 2: Create Elasticsearch Client Connection"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "m-_r6lROasNI"
},
"outputs": [],
"source": [
"if 'ELASTIC_CLOUD_ID' in os.environ:\n",
" es = Elasticsearch(\n",
" cloud_id=os.environ['ELASTIC_CLOUD_ID'],\n",
" api_key=(os.environ['ELASTIC_APIKEY_ID'], os.environ['ELASTIC_APIKEY_SECRET']),\n",
" request_timeout=30\n",
" )\n",
"elif 'ELASTIC_URL' in os.environ:\n",
" es = Elasticsearch(\n",
" os.environ['ELASTIC_URL'],\n",
" api_key=(os.environ['ELASTIC_APIKEY_ID'], os.environ['ELASTIC_APIKEY_SECRET']),\n",
" request_timeout=30\n",
" )\n",
"else:\n",
" print(\"env needs to set either ELASTIC_CLOUD_ID or ELASTIC_URL\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "G9FWxKtef3uu"
},
"source": [
"## Step 3: Monitoring prompts sent through a Proxy\n",
"\n",
"Imagine I have the following question from a customer after a winter storm\n",
"\n",
"> My power was out all last week at my home at 123 Grove street.\n",
"When I talked to my neighbor Jane Lopez, she said she got rebate on her bill.\n",
"Can you do the same for me?\n",
"\n",
"The following is a simulated customer example where we'll use the LLM to answer a customer service case.\n",
"\n",
"We'll learn how to **retrieve** the best call script using semantic search in a later exercise. \n",
"\n",
"**Some organizations would be uncomfortable with customer PII going to a 3rd party service. Who gets an unencrypted version of the prompt?**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ZaucVWDddOuR"
},
"outputs": [],
"source": [
"import elasticapm\n",
"import random\n",
"\n",
"os.environ['ELASTIC_APM_SERVICE_NAME'] = \"genai_workshop_lab_redact\"\n",
"apmclient = elasticapm.Client() \\\n",
" if elasticapm.get_client() is None \\\n",
" else elasticapm.get_client()\n",
"\n",
"customer_id = 123\n",
"\n",
"first_names = [\"Alice\", \"Bob\", \"Charlie\", \"Diana\", \"Edward\",\n",
" \"Fiona\", \"George\", \"Hannah\", \"Ian\", \"Julia\"]\n",
"last_names = [\"Smith\", \"Johnson\", \"Williams\", \"Brown\", \"Jones\",\n",
" \"Garcia\", \"Miller\", \"Davis\", \"Rodriguez\", \"Martinez\"]\n",
"\n",
"# Function to generate a random full name\n",
"def generate_random_name():\n",
" first_name = random.choice(first_names)\n",
" last_name = random.choice(last_names)\n",
" return f\"{first_name} {last_name}\"\n",
"\n",
"\n",
"customer_question = f\"\"\"My power was out all last week at my home on Grove street.\n",
"When I talked to my neighbor {generate_random_name()},\n",
"they said they got rebate on their bill. Can you do the same for me?\"\"\"\n",
"\n",
"retrieved_best_answer = \"\"\"We are currently offering a $100 rebate for\n",
"customers affected by the recent winter storm. If our records show the\n",
"customer was impacted, tell them they can look forward to a $100 credit on their\n",
"next monthly bill. If the customer believes they were impacted but our records\n",
"don't show this fact, let them know we'll be escalating their case and they\n",
"should expect a call within 24 hours.\"\"\"\n",
"\n",
"\n",
"import time\n",
"def random_service_time(shorter, longer):\n",
" sleep_time = random.uniform(shorter, longer)\n",
" time.sleep(sleep_time)\n",
"\n",
"def days_impacted_check(customer_id):\n",
" apmclient.begin_transaction(\"impact_check\")\n",
" ## simulated sevice call delay (some parts of the lab LLM are cached)\n",
" random_service_time(0.1,0.3)\n",
" days = 5 ## simulated result of a back end service call\n",
" apmclient.end_transaction(\"impact_check\", \"success\")\n",
" if days > 0 :\n",
" return f\"the customer was impacted by the winter storm for {days} serice days\"\n",
" else:\n",
" return \"the customer was not impacted byt he winter storm\"\n",
"\n",
"\n",
"system_prompt = f\"\"\"\n",
"You are an AI customer support agent for a electric power utility company that\n",
"You use the following retrieved approved call script and customer fact\n",
"to answer the customer's question and try to retain them as a customer.\n",
"\n",
"Call script: {retrieved_best_answer}\n",
"\n",
"Our records: {days_impacted_check(customer_id)}\n",
"\"\"\"\n",
"\n",
"def print_light_blue(text):\n",
" print(f'\\033[94m{text}\\033[0m')\n",
"\n",
"def chatCompletion(messages):\n",
"\n",
" client = OpenAI(api_key=openai.api_key, base_url=openai.api_base)\n",
" completion = client.chat.completions.create(\n",
" model=openai.default_model,\n",
" max_tokens=150,\n",
" messages=messages\n",
" )\n",
"\n",
" return completion\n",
"\n",
"def chatWithPowerAgent(prompt):\n",
" apmclient.begin_transaction(\"llm_call\")\n",
"\n",
" elasticapm.label(prompt = prompt)\n",
"\n",
" messages = [\n",
" {\"role\": \"system\", \"content\": system_prompt},\n",
" {\"role\": \"user\", \"content\": prompt}\n",
" ]\n",
" print_light_blue(\"Prompt:\")\n",
" print_light_blue(wrap_text(messages[0][\"content\"],70))\n",
" print_light_blue(wrap_text(messages[1][\"content\"],70))\n",
" completion = chatCompletion(messages)\n",
"\n",
" response_text = completion.choices[0].message.content\n",
"\n",
" apmclient.end_transaction(\"llm_call\", \"success\")\n",
"\n",
" return wrap_text(response_text,70)\n",
"\n",
"\n",
"customer_service_response = chatWithPowerAgent(customer_question)\n",
"\n",
"print(\"Customer Service Response:\")\n",
"print(customer_service_response)\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ILLdLhQzsKY0"
},
"source": [
"## Step 4: Redacting unstructured data with NER Transformer Model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "VsLcvOMGT8Z1"
},
"outputs": [],
"source": [
"from transformers import AutoTokenizer, AutoModelForTokenClassification\n",
"from transformers import pipeline\n",
"import json\n",
"# pretty printing JSON objects\n",
"def json_pretty(input_object):\n",
" print(json.dumps(input_object, indent=1))\n",
"\n",
"tokenizer = AutoTokenizer.from_pretrained(\"dslim/bert-base-NER\")\n",
"model = AutoModelForTokenClassification.from_pretrained(\"dslim/bert-base-NER\")\n",
"nlp = pipeline(\"ner\", model=model, tokenizer=tokenizer)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "wJNzYcnJV0f3"
},
"outputs": [],
"source": [
"ner_results = nlp(customer_question)\n",
"print(ner_results)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-lD1hCCpGs7H"
},
"source": [
"### Step 5: Let's make an easy to use Redaction Function"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "4ALjsXOCWvId"
},
"outputs": [],
"source": [
"\n",
"def redact_named_entities(text):\n",
" apmclient.begin_transaction(\"redaction_local\")\n",
" # Perform named entity recognition on the text\n",
" entities = nlp(text)\n",
"\n",
" # Sort entities by their start index in reverse order\n",
" entities = sorted(entities, key=lambda x: x['start'], reverse=True)\n",
"\n",
" # Iterate over entities and replace them in the text\n",
" for entity in entities:\n",
" ent_type = entity['entity']\n",
" start = entity['start']\n",
" end = entity['end']\n",
" text = text[:start] + \"<REDACTED>\" + text[end:]\n",
"\n",
"\n",
" apmclient.end_transaction(\"redaction_local\", \"success\")\n",
" return text\n",
"\n",
"# Example usage\n",
"text = \"Alice lives in Paris.\"\n",
"redacted_text = redact_named_entities(text)\n",
"print(redacted_text)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "MeqwLVolG4VQ"
},
"source": [
"## Step 6: Test the function on a customer question"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "awtGm2vsrE3k"
},
"outputs": [],
"source": [
"customer_question = f\"\"\"My power was out all last week at my home at\n",
"Grove street. When I talked to my neighbor {generate_random_name()}, they said they got\n",
"rebate on their bill. Can you do the same for me?\"\"\"\n",
"\n",
"print(redact_named_entities(customer_question))\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "nuiUxbCkYS3b"
},
"source": [
"## Step 7: Alternatively, how would we install the same NER Model into Elasticsarch?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "yn7sULLEYW6w"
},
"outputs": [],
"source": [
"def load_model(model_id, task_type):\n",
" with tempfile.TemporaryDirectory() as tmp_dir:\n",
" print(f\"Loading HuggingFace transformer tokenizer and model [{model_id}] for task [{task_type}]\" )\n",
"\n",
" tm = TransformerModel(model_id=model_id, task_type=task_type)\n",
" model_path, config, vocab_path = tm.save(tmp_dir)\n",
"\n",
" ptm = PyTorchModel(es, tm.elasticsearch_model_id())\n",
" model_exists = es.options(ignore_status=404).ml.get_trained_models(model_id=ptm.model_id).meta.status == 200\n",
"\n",
" if model_exists:\n",
" print(\"Model has already been imported\")\n",
" else:\n",
" print(\"Importing model\")\n",
" ptm.import_model(model_path=model_path, config_path=None, vocab_path=vocab_path, config=config)\n",
" print(\"Starting model deployment\")\n",
" ptm.start()\n",
" print(f\"Model successfully imported with id '{ptm.model_id}'\")\n",
"\n",
"## Model is pre-loaded into Elasticsearch, but this is how you would do it\n",
"\n",
"## load_model(\"dslim/bert-base-NER\", \"ner\")\n",
"print(\"Model is already loaded\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Lg7MWpurb7g2"
},
"source": [
"## Step 8: Define a Redaction Ingest Pipeline in Elasticsearch\n",
"\n",
"We will use the [Elasticsearch Ingest Pipelines](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html) to redact data before it is written to Elasticsearch. These pipelines can also be used to update data in existing indices or for reindexing.\n",
"\n",
"This pipeline:\n",
"- Uses the [inference processor](https://www.elastic.co/guide/en/elasticsearch/reference/current/inference-processor.html) to call the NER model loaded in Part 1 and map the document's `message` field to the field expected by the model: `text_field`.\n",
"- Uses the [Painless scripting language](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-painless.html) from within a [script processor](https://www.elastic.co/guide/en/elasticsearch/reference/current/script-processor.html) to replace the model-detected entities stored in the `ml.inference.entities` array with their class name, and store it within a **new** document field: `redacted`.\n",
"- Uses the [redact processor](https://www.elastic.co/guide/en/elasticsearch/reference/current/redact-processor.html) to identify and redact any supported patterns found within the new redacted field, as well as identifying and redacting a set of custom patterns.\n",
"- Removes the `ml` fields added to the document by the inference processor via the [remove processor](https://www.elastic.co/guide/en/elasticsearch/reference/current/remove-processor.html) as they're no longer needed.\n",
"- Defines a failure condition to capture any errors, just in case we have them.\n",
"\n",
"**NOTE:** As of 8.11, the redact processor is a Technical Preview.\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "4_GKI4ZQd5zw"
},
"outputs": [],
"source": [
"body = {\n",
" \"processors\": [\n",
" {\n",
" \"inference\": {\n",
" \"model_id\": \"dslim__bert-base-ner\",\n",
" \"field_map\": {\n",
" \"message\": \"text_field\"\n",
" }\n",
" }\n",
" },\n",
" {\n",
" \"script\": {\n",
" \"lang\": \"painless\",\n",
" \"source\": \"\"\"\n",
"String msg = ctx['message'];\n",
"for (item in ctx['ml']['inference']['entities'])\n",
" msg = msg.replace(item['entity'], '<' + item['class_name'] + '>');\n",
"ctx['redacted'] = msg;\n",
"\"\"\"\n",
" }\n",
" },\n",
" {\n",
" \"redact\": {\n",
" \"field\": \"redacted\",\n",
" \"patterns\": [\n",
" \"%{EMAILADDRESS:EMAIL}\",\n",
" \"%{IP:IP_ADDRESS}\",\n",
" \"%{CREDIT_CARD:CREDIT_CARD}\",\n",
" \"%{SSN:SSN}\",\n",
" \"%{PHONE:PHONE}\"\n",
" ],\n",
" \"pattern_definitions\": {\n",
" \"CREDIT_CARD\": \"\\\\d{4}[ -]\\\\d{4}[ -]\\\\d{4}[ -]\\\\d{4}\",\n",
" \"SSN\": \"\\\\d{3}-\\\\d{2}-\\\\d{4}\",\n",
" \"PHONE\": \"\\\\d{3}-\\\\d{3}-\\\\d{4}\"\n",
" }\n",
" }\n",
" },\n",
" {\n",
" \"remove\": {\n",
" \"field\": [\n",
" \"ml\"\n",
" ],\n",
" \"ignore_missing\": True,\n",
" \"ignore_failure\": True\n",
" }\n",
" }\n",
" ],\n",
" \"on_failure\": [\n",
" {\n",
" \"set\": {\n",
" \"field\": \"failure\",\n",
" \"value\": \"pii_script-redact\"\n",
" }\n",
" }\n",
" ]\n",
"}\n",
"\n",
"## es.ingest.put_pipeline(id='redact', body=body)\n",
"print(\"Ingest pipeline is already loaded\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Dcv-hhwqlkjD"
},
"source": [
"## Step 9: Test the pipeline\n",
"\n",
"Does it work?\n",
"\n",
"Let's use the [Simulate Pipeline API](https://www.elastic.co/guide/en/elasticsearch/reference/current/simulate-pipeline-api.html) to find out."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "HaXVRsyLhr22"
},
"outputs": [],
"source": [
"docs = [\n",
" {\n",
" \"_source\": {\n",
" \"message\": \"John Smith lives at 123 Main St. Highland Park, CO. His email address \"\\\n",
" \"is jsmith123@email.com and his phone number is 412-189-9043. I found his social \"\\\n",
" \"security number, it is 942-00-1243. Oh btw, his credit card is 1324-8374-0978-2819 \"\\\n",
" \"and his gateway IP is 192.168.1.2\"\n",
" }\n",
" },\n",
" {\n",
" \"_source\": {\n",
" \"message\": \"I had a call with Jane yesterday, she suggested we talk with John \"\\\n",
" \"from Global Systems. Their office is in Springfield\"\n",
" }\n",
" }\n",
"]\n",
"\n",
"pprint(es.ingest.simulate(id='redact', docs=docs).body)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "p6LSsMUyqTdZ"
},
"source": [
"## Step 10: End to End Example, Monitored and Redacted\n",
"\n",
"Switcing back to the local python model ...\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "iL4A3xUNHQ0K"
},
"outputs": [],
"source": [
"customer_question = f\"\"\"My power was out all last week at my home on\n",
"Grove street. When I talked to my neighbor {generate_random_name()}, they said they got\n",
"rebate on their bill. Can you do the same for me?\"\"\"\n",
"\n",
"redacted_text = redact_named_entities(customer_question)\n",
"\n",
"print(chatWithPowerAgent(redacted_text))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KlWAaHh8Qnx9"
},
"source": [
"🛑 Stop Here 🛑\n",
"\n",
"This Ends Lab 2-2\n",
"<hr/>\n",
"\n",
"\n",
"# >>> [Open the Next Lab: https://ela.st/genai-wave2](https://ela.st/genai-wave2)"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}