# Using AutoGen with Elasticsearch

This notebook demonstrates how to use AutoGen with Elasticsearch. This notebook is based on the article [Using AutoGen with Elasticsearch](https://www.elastic.co/search-labs/blog/using-autogen-with-elasticsearch).

## Installing dependencies and importing packages

In [1]:
%pip install autogen elasticsearch==8.17 nest-asyncio


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import json
import os
import nest_asyncio
import requests

from getpass import getpass
from autogen import (
    AssistantAgent,
    GroupChat,
    GroupChatManager,
    UserProxyAgent,
    register_function,
)
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

nest_asyncio.apply()

## Declaring variables

In [3]:
os.environ["ELASTIC_ENDPOINT"] = getpass("Elastic Enpoint: ")
os.environ["ELASTIC_API_KEY"] = getpass("Elastic Api Key: ")
os.environ["SERPER_API_KEY"] = getpass("Seper API Key: ")
os.environ["OPENAI_API_KEY"] = getpass("OpenAI API Key: ")

## Instance a Elasticsearch client

In [None]:
# Elasticsearch client
_client = Elasticsearch(
    os.environ["ELASTIC_ENDPOINT"],
    api_key=os.environ["ELASTIC_API_KEY"],
)

## Creating mappings and inference endpoint

In [None]:
try:
    _client.options(
        request_timeout=60, max_retries=3, retry_on_timeout=True
    ).inference.put(
        task_type="sparse_embedding",
        inference_id="jobs-candidates-inference",
        body={
            "service": "elasticsearch",
            "service_settings": {
                "adaptive_allocations": {"enabled": True},
                "num_threads": 1,
                "model_id": ".elser_model_2",
            },
        },
    )

    print("Inference endpoint created successfully.")

except Exception as e:
    print(
        f"Error creating inference endpoint: {e.info['error']['root_cause'][0]['reason'] }"
    )

In [None]:
try:
    _client.indices.create(
        index="available-candidates",
        body={
            "mappings": {
                "properties": {
                    "candidate_name": {"type": "text", "copy_to": "semantic_field"},
                    "position_title": {"type": "text", "copy_to": "semantic_field"},
                    "profile_description": {
                        "type": "text",
                        "copy_to": "semantic_field",
                    },
                    "expected_salary": {"type": "text", "copy_to": "semantic_field"},
                    "skills": {"type": "keyword", "copy_to": "semantic_field"},
                    "semantic_field": {
                        "type": "semantic_text",
                        "inference_id": "jobs-candidates-inference",
                    },
                }
            }
        },
    )

    print("index created successfully")
except Exception as e:
    print(
        f"Error creating inference endpoint: {e.info['error']['root_cause'][0]['reason'] }"
    )

## Indexing documents

In [None]:
documents = [
    {
        "candidate_name": "John",
        "position_title": "Software Engineer",
        "expected_salary": "$85,000 - $120,000",
        "profile_description": "Experienced software engineer with expertise in backend development, cloud computing, and scalable system architecture.",
        "skills": ["Python", "Java", "AWS", "Microservices", "Docker", "Kubernetes"],
    },
    {
        "candidate_name": "Emily",
        "position_title": "Data Scientist",
        "expected_salary": "$90,000 - $140,000",
        "profile_description": "Data scientist with strong analytical skills and experience in machine learning and big data processing.",
        "skills": ["Python", "SQL", "TensorFlow", "Pandas", "Hadoop", "Spark"],
    },
    {
        "candidate_name": "Michael",
        "position_title": "DevOps Engineer",
        "expected_salary": "$95,000 - $130,000",
        "profile_description": "DevOps specialist focused on automation, CI/CD pipelines, and infrastructure as code.",
        "skills": ["Terraform", "Ansible", "Jenkins", "Docker", "Kubernetes", "AWS"],
    },
    {
        "candidate_name": "Sarah",
        "position_title": "Product Manager",
        "expected_salary": "$110,000 - $150,000",
        "profile_description": "Product manager with a technical background, skilled in agile methodologies and user-centered design.",
        "skills": ["JIRA", "Agile", "Scrum", "A/B Testing", "SQL", "UX Research"],
    },
    {
        "candidate_name": "David",
        "position_title": "UX/UI Designer",
        "expected_salary": "$70,000 - $110,000",
        "profile_description": "Creative UX/UI designer with experience in user research, wireframing, and interactive prototyping.",
        "skills": ["Figma", "Adobe XD", "Sketch", "HTML", "CSS", "JavaScript"],
    },
    {
        "candidate_name": "Jessica",
        "position_title": "Cybersecurity Analyst",
        "expected_salary": "$100,000 - $140,000",
        "profile_description": "Cybersecurity expert with experience in threat detection, penetration testing, and compliance.",
        "skills": [
            "Python",
            "SIEM",
            "Penetration Testing",
            "Ethical Hacking",
            "Nmap",
            "Metasploit",
        ],
    },
    {
        "candidate_name": "Robert",
        "position_title": "Cloud Architect",
        "expected_salary": "$120,000 - $180,000",
        "profile_description": "Cloud architect specializing in designing secure and scalable cloud infrastructures.",
        "skills": ["AWS", "Azure", "GCP", "Kubernetes", "Terraform", "CI/CD"],
    },
    {
        "candidate_name": "Sophia",
        "position_title": "AI/ML Engineer",
        "expected_salary": "$100,000 - $160,000",
        "profile_description": "Machine learning engineer with experience in deep learning, NLP, and computer vision.",
        "skills": ["Python", "PyTorch", "TensorFlow", "Scikit-Learn", "OpenCV", "NLP"],
    },
    {
        "candidate_name": "Daniel",
        "position_title": "QA Engineer",
        "expected_salary": "$60,000 - $100,000",
        "profile_description": "Quality assurance engineer focused on automated testing, test-driven development, and software reliability.",
        "skills": ["Selenium", "JUnit", "Cypress", "Postman", "Git", "CI/CD"],
    },
    {
        "candidate_name": "Emma",
        "position_title": "Technical Support Specialist",
        "expected_salary": "$50,000 - $85,000",
        "profile_description": "Technical support specialist with expertise in troubleshooting, customer support, and IT infrastructure.",
        "skills": [
            "Linux",
            "Windows Server",
            "Networking",
            "SQL",
            "Help Desk",
            "Scripting",
        ],
    },
]

In [None]:
def build_data():
    for doc in documents:
        yield {"_index": "available-candidates", "_source": doc}


try:
    success, errors = bulk(_client, build_data())
    if errors:
        print("Errors during indexing:", errors)
    else:
        print(f"{success} documents indexed successfully")

except Exception as e:
    print(f"Error: {str(e)}")

## AutoGen

### AI endpoint configuration

In [None]:
config_list = [{"model": "gpt-4o-mini", "api_key": os.environ["OPENAI_API_KEY"]}]
ai_endpoint_config = {"config_list": config_list}

## Agents

In [None]:
user_proxy = UserProxyAgent(
    name="Admin",
    system_message="""You are a human administrator.
        Your role is to interact with agents and tools to execute tasks efficiently.
        Execute tasks and agents in a logical order, ensuring that all agents perform
        their duties correctly. All tasks must be approved by you before proceeding.""",
    human_input_mode="NEVER",
    code_execution_config=False,
    is_termination_msg=lambda msg: msg.get("content") is not None
    and "TERMINATE" in msg["content"],
    llm_config=ai_endpoint_config,
)

researcher = AssistantAgent(
    name="Researcher",
    system_message="""You are a Researcher.
        Your role is to use the 'search_in_internet' tool to find individual
        job offers realted to the candidates profiles. Each job offer must include a direct link to a specific position,
        not just a category or group of offers. Ensure that all job offers are relevant and accurate.""",
    llm_config=ai_endpoint_config,
)

retriever = AssistantAgent(
    name="Retriever",
    llm_config=ai_endpoint_config,
    system_message="""You are a Retriever.
        Your task is to use the 'elasticsearch_hybrid_search' tool to retrieve
        candidate profiles from Elasticsearch.""",
)

matcher = AssistantAgent(
    name="Matcher",
    system_message="""Your role is to match job offers with suitable candidates.
        The matches must be accurate and beneficial for both parties.
        Only match candidates with job offers that fit their qualifications.""",
    llm_config=ai_endpoint_config,
)

critic = AssistantAgent(
    name="Critic",
    system_message="""You are the Critic.
        Your task is to verify the accuracy of job-candidate matches.
        If the matches are correct, inform the Admin and include the word 'TERMINATE' to end the process.""",  # End condition
    llm_config=ai_endpoint_config,
)

## Tools setup



In [None]:
async def elasticsearch_hybrid_search(question: str):
    """
    Search in Elasticsearch using semantic search capabilities.
    """

    response = _client.search(
        index="available-candidates",
        body={
            "_source": {
                "includes": [
                    "candidate_name",
                    "position_title",
                    "profile_description",
                    "expected_salary",
                    "skills",
                ],
            },
            "size": 10,
            "retriever": {
                "rrf": {
                    "retrievers": [
                        {
                            "standard": {
                                "query": {"match": {"position_title": question}}
                            }
                        },
                        {
                            "standard": {
                                "query": {
                                    "semantic": {
                                        "field": "semantic_field",
                                        "query": question,
                                    }
                                }
                            }
                        },
                    ]
                }
            },
        },
    )

    hits = response["hits"]["hits"]

    if not hits:
        return ""

    result = json.dumps([hit["_source"] for hit in hits], indent=2)

    return result


async def search_in_internet(query: str):
    """Search in internet using Serper and retrieve results in json format"""

    url = "https://google.serper.dev/search"
    headers = {
        "X-API-KEY": os.environ["SERPER_API_KEY"],
        "Content-Type": "application/json",
    }

    payload = json.dumps({"q": query})
    response = requests.request("POST", url, headers=headers, data=payload)
    original_results = response.json()

    related_searches = original_results.get("relatedSearches", [])
    original_organics = original_results.get("organic", [])

    for search in related_searches:
        payload = json.dumps({"q": search.get("query")})
        response = requests.request("POST", url, headers=headers, data=payload)
        original_organics.extend(response.json().get("organic", []))

    return original_organics

In [None]:
register_function(
    elasticsearch_hybrid_search,
    caller=retriever,
    executor=user_proxy,
    name="elasticsearch_hybrid_search",
    description="A method retrieve information from Elasticsearch using semantic search capabilities",
)

register_function(
    search_in_internet,
    caller=researcher,
    executor=user_proxy,
    name="search_in_internet",
    description="A method for search in internet",
)

## Starting agents

In [None]:
groupchat = GroupChat(
    agents=[user_proxy, researcher, retriever, matcher, critic],
    messages=[],
    max_round=50,
)

manager = GroupChatManager(groupchat=groupchat, llm_config=ai_endpoint_config)

user_proxy.initiate_chat(
    manager,
    message="""Compare the candidate profiles retrieved by the Retriever with the job offers
        found by the Researcher on the internet.
        Both candidate profiles and job offers are related to the software industry.
        Ensure that each match is accurate and beneficial for both parties.
        Each candidate should be matched with a single job offer.
        Include the job offer link provided by the Researcher.""",
)

## Cleaning environment

Delete the resources used to prevent them from consuming resources.

In [None]:
# Cleanup - Delete Index
_client.indices.delete(index="available-candidates", ignore=[400, 404])

# Cleanup - Inference endpoint
_client.inference.delete(inference_id="jobs-candidates-inference", ignore=[400, 404])