# Using CrewAI with Elasticsearch

This notebook demonstrates how to use CrewAI with Elasticsearch. This notebook is based on the article [Using CrewAI with Elasticsearch](https://www.elastic.co/search-labs/blog/using-crewai-with-elasticsearch).

## Installing dependencies and importing packages

In [None]:
# It is suggested to run this script with Python 3.11
%pip install elasticsearch==8.17 'crewai[tools]'

In [None]:
import json
import os

from getpass import getpass
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

from crewai import Agent, Crew, Task
from crewai.tools import tool
from crewai_tools import SerperDevTool, WebsiteSearchTool

## Declaring variables

In [None]:
os.environ["ELASTIC_ENDPOINT"] = getpass("Elastic Enpoint: ")
os.environ["ELASTIC_API_KEY"] = getpass("Elastic Api Key: ")
os.environ["SERPER_API_KEY"] = getpass("Serper API Key: ")
os.environ["OPENAI_API_KEY"] = getpass("OpenAI API Key: ")

## Instance a Elasticsearch client

In [None]:
# Elasticsearch client
_client = Elasticsearch(
    hosts=os.environ["ELASTIC_ENDPOINT"],
    api_key=os.environ["ELASTIC_API_KEY"],
)

## Creating mappings and inference endpoint

In [None]:
try:
    _client.options(
        request_timeout=60, max_retries=3, retry_on_timeout=True
    ).inference.put(
        task_type="sparse_embedding",
        inference_id="clothes-inference",
        body={
            "service": "elasticsearch",
            "service_settings": {
                "adaptive_allocations": {"enabled": True},
                "num_threads": 1,
                "model_id": ".elser_model_2",
            },
        },
    )

    print("Inference endpoint created successfully.")

except Exception as e:
    print(
        f"Error creating inference endpoint: {e.info['error']['root_cause'][0]['reason'] }"
    )

*NOTE: After creating the inference endpoint, it is highly recommended to wait 30 seconds for the model to be ready before sending requests.*

In [None]:
try:
    _client.indices.create(
        index="summer-clothes",
        body={
            "mappings": {
                "properties": {
                    "title": {"type": "text", "copy_to": "semantic_field"},
                    "description": {"type": "text", "copy_to": "semantic_field"},
                    "price": {"type": "float"},
                    "semantic_field": {
                        "type": "semantic_text",
                        "inference_id": "clothes-inference",
                    },
                }
            }
        },
    )
except Exception as e:
    print(
        f"Error creating inference endpoint: {e.info['error']['root_cause'][0]['reason'] }"
    )

# Indexing documents

In [None]:
documents = [
    {
        "title": "Twist-Detail Crop Top",
        "description": "Fitted crop top in woven, patterned fabric with linen content. Wide shoulder straps, sweetheart neckline, and gathered side seams for a gently draped effect. Twisted detail at center bust, cut-out section at front, and wide smocking at back. Lined",
        "price": 34.99,
    },
    {
        "title": "Rib-knit Tank Top",
        "description": "Short, fitted top in a soft rib knit. Extra-narrow shoulder straps and a square neckline.",
        "price": 7.49,
    },
    {
        "title": "Linen-blend Shorts",
        "description": "Shorts in an airy, woven linen blend. High, ruffle-trimmed waist, narrow drawstring and covered elastic at waistband, and discreet side pockets.",
        "price": 13.99,
    },
    {
        "title": "Twill Cargo Shorts",
        "description": "Fitted shorts in cotton twill with a V-shaped yoke at front and back. High waist, zip fly with button, and patch front pockets.",
        "price": 20.99,
    },
    {
        "title": "Slim Fit Ribbed Tank Top",
        "description": "Slim-fit tank top in medium weight, ribbed cotton-blend jersey with a fitted silhouette. Straight-cut hem.",
        "price": 8.49,
    },
    {
        "title": "Relaxed Fit Linen Resort Shirt",
        "description": "Relaxed-fit shirt in airy linen. Resort collar, buttons without placket, yoke at back, and short sleeves. Straight-cut hem. Fabric made from linen is breathable, looks great when ironed or wrinkled, and softens over time.",
        "price": 17.99,
    },
    {
        "title": "Swim Shorts",
        "description": "Swim shorts in woven fabric. Drawstring and covered elastic at waistband, side pockets, and a back pocket with hook-loop fastener. Small slit at sides. Mesh liner shorts.",
        "price": 14.99,
    },
    {
        "title": "Baggy Fit Cargo Shorts",
        "description": "Baggy-fit cargo shorts in cotton canvas with a generous but not oversized silhouette. Zip fly with button, diagonal side pockets, back pockets with flap and snap fasteners, and bellows leg pockets with snap fasteners.",
        "price": 20.99,
    },
    {
        "title": "Muslin Shorts",
        "description": "Shorts in airy cotton muslin. High, ruffle-trimmed waist, covered elastic at waistband, and an extra-narrow drawstring with a bead at ends. Discreet side pockets.",
        "price": 15.99,
    },
    {
        "title": "Oversized Lyocell-blend Dress",
        "description": "Short, oversized dress in a woven lyocell blend. Gathered, low-cut V-neck with extra-narrow ties at front, 3/4-length, raglan-cut balloon sleeves with narrow elastic at cuffs, and seams at waist and hips with delicate piping. Unlined.",
        "price": 38.99,
    },
]

In [None]:
def build_data():
    for doc in documents:
        yield {"_index": "summer-clothes", "_source": doc}


try:
    success, errors = bulk(_client, build_data())
    print(f"{success} documents indexed successfully")
    if errors:
        print("Errors during indexing:", errors)

except Exception as e:
    print(f"Error: {str(e)}")

# Creating CrewAI custom tool

In [None]:
@tool("Elasticsearch custom tool")
def elasticsearch_tool(question: str) -> str:
    """
    Search in Elasticsearch using semantic search capabilities.

    Args:
        question (str): The search query to be semantically matched

    Returns:
        str: Concatenated hits from Elasticsearch as string JSON
    """

    response = _client.search(
        index="summer-clothes",
        body={
            "size": 10,
            "_source": {"includes": ["description", "title", "price"]},
            "retriever": {
                "rrf": {
                    "retrievers": [
                        {"standard": {"query": {"match": {"title": question}}}},
                        {
                            "standard": {
                                "query": {
                                    "semantic": {
                                        "field": "semantic_field",
                                        "query": question,
                                    }
                                }
                            }
                        },
                    ]
                }
            },
        },
    )

    hits = response["hits"]["hits"]

    if not hits:
        return ""

    result = json.dumps([hit["_source"] for hit in hits], indent=2)

    return result

In [None]:
search_tool = SerperDevTool()
web_rag_tool = WebsiteSearchTool()

# Setup Agents and Tasks

In [None]:
es_retriever_agent = Agent(
    role="Retriever",
    goal="Retrieve Elasticsearch documents",
    backstory="You are an expert researcher",
    tools=[elasticsearch_tool],
    verbose=True,
)


internet_researcher_agent = Agent(
    role="Research analyst",
    goal="Provide up-to-date market analysis of the industry",
    backstory="You are an expert analyst",
    tools=[search_tool, web_rag_tool],
    verbose=True,
)


writer_agent = Agent(
    role="Content Writer",
    goal="Craft engaging blog posts about the information gathered",
    backstory="A skilled writer with a passion for writing about fashion",
    tools=[],
    verbose=True,
)

In [None]:
es_retriever_task = Task(
    description="Retrieve documents from the Elasticsearch index.",
    expected_output="A list of documents retrieved from the Elasticsearch index based on the query.",
    agent=es_retriever_agent,
)


internet_research_task = Task(
    description="Research the latest trends in the summer clothes industry and provide a summary.",
    expected_output="A summary of the top 5 trending clothes for summer with a unique perspective on their significance.",
    agent=internet_researcher_agent,
)

write_task = Task(
    description="Compare and contrast the information provided from the Retriever agent and the research agent, use cites. Be short",
    expected_output="Short report about Elasticsearch retriever and researcher.",
    agent=writer_agent,
    output_file="blog-posts/new_post.md",  # The final blog post will be saved here
)

# Executing task

In [None]:
# Use in a crew
crew = Crew(
    agents=[es_retriever_agent, internet_researcher_agent, writer_agent],
    tasks=[
        es_retriever_task,
        internet_research_task,
        write_task,
    ],
)

# Execute tasks
crew.kickoff()

## Deleting

Delete the resources used to prevent them from consuming resources.

In [None]:
# Cleanup - Delete Index
_client.indices.delete(index="summer-clothes", ignore=[400, 404])

# Cleanup - Inference endpoint
_client.inference.delete(inference_id="clothes-inference", ignore=[400, 404])