## Retrieve and Rerank

In this example we will:
* index a BEIR dataset to Elasticsearch
* retrieve data with BM25
* optimize relevance with a reranking module running locally to our machine

Regarding the last point, even though we are going to focus on small-size reranking modules it would be beneficial to run this notebook on a machine with access to GPUs to speed up the execution. 

## Requirements

For this notebook, you will need an **Elastic deployment**, we will be using [Elastic Cloud](https://www.elastic.co/guide/en/cloud/current/ec-getting-started.html) (if you don't have a deployment please see below to setup a free trial), **Python 3.10.x** or later and some **Python dependencies**:
- `elasticsearch` (Elastic's Python client)
- `sentence-transformers` (to load the reranking module locally)
- `datasets` (Hugginface's library to download datasets with minimal effort)
- `pytrec_eval` (Needed to compute accuracy scores such as `nDCG@10`)

## Create Elastic Cloud deployment

If you don't have an Elastic Cloud deployment, sign up [here](https://cloud.elastic.co/registration?onboarding_token=vectorsearch&utm_source=github&utm_content=elasticsearch-labs-notebook) for a free trial.
Once logged in to your Elastic Cloud account, go to the [Create deployment](https://cloud.elastic.co/deployments/create) page and select **Create deployment**. Leave all settings with their default values.





## Installing packages

Let's start by installing the necessary Python libraries (preferably in a virtual environment)


In [None]:
!pip install -U elasticsearch sentence-transformers datasets pytrec_eval

and let's gradually build our code structure

In [None]:
from collections import defaultdict
from getpass import getpass
from typing import Any, Union

from datasets.arrow_dataset import Dataset
from datasets.dataset_dict import DatasetDict, IterableDatasetDict
from datasets.iterable_dataset import IterableDataset
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from sentence_transformers import CrossEncoder
from tqdm import tqdm
import datasets
import numpy as np
import pytrec_eval

Before we dive deeper into the code, let's set the dataset name as a constant variable in our script. 

In [None]:
DATASET = "trec-covid"
INDEX_NAME = f"reranking-test-{DATASET}"

Let us also define once the necessay credentials required to access the Elastic Cloud deployment

In [None]:
ELASTIC_CLOUD_ID = getpass("Elastic Cloud ID: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")

and initialize the Elasticseach Python client

In [None]:
client = Elasticsearch(
    cloud_id=ELASTIC_CLOUD_ID,
    api_key=ELASTIC_API_KEY,
)

### Test the client

Before you continue, confirm that the client has connected with this test.


In [None]:
client_info = client.info()

f"Successfully connected to cluster {client_info['cluster_name']} (version {client_info['version']['number']})"

---

## Helper functions

In this section we define some helper functions to increase the readability of our code.

Let's start with the functions that will handle the interaction with our Elastic Cloud deployment such as: 
- creating an index
- storing the documents
- retrieving documents with BM25

In [None]:
def create_index(es_client: Elasticsearch, name: str, analyzer: str = "english"):
    """
    Creating an index into our deployment

    Args:
        `es_client`: An instance of a Python Elasticsearch client
        `analyzer`: A string identifier of the language analyzer to be used. By default we use `english`
            (more details at https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-lang-analyzer.html)
    Returns:
        None
    """

    # we store `title` & `text` into separate fields and
    _mappings = {
        "properties": {
            "title": {"type": "text", "analyzer": analyzer},
            "txt": {"type": "text", "analyzer": analyzer},
        }
    }

    # create an index with the specified name
    es_client.options(ignore_status=[400]).indices.create(
        index=name,
        settings={"number_of_shards": 1},
        mappings=_mappings,
    )


def index_corpus(
    corpus: Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset],
    index_name: str,
    es_client: Elasticsearch,
):
    """
    Pushing documents over to our index

    Args:
        `corpus`: The corpus of the dataset we have selected. It's a Huggingface dataset with the three fields (`_id`, `title`, `text`)
        `index_name`: The name of the Elasticsearch index
        `es_client`: An instance of a Python Elasticsearch client
    Returns:
        None
    """

    def get_iterable():
        for docid, doc_title, doc_txt in tqdm(
            zip(corpus["_id"], corpus["title"], corpus["text"]), total=corpus.num_rows
        ):
            yield {
                "_id": docid,
                "_op_type": "index",
                "title": doc_title,
                "txt": doc_txt,
            }

    # and bulk index them
    bulk(client=es_client, index=index_name, actions=get_iterable(), max_retries=3)

    # making sure that the index has been refreshed
    es_client.indices.refresh(index=index_name)

In [None]:
def retrieve(
    queries: Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset],
    es_client: Elasticsearch,
    index_name: str,
    size: int = 10,
    batch_size: int = 32,
):
    """
    Retrieve docs from the index by matching title, txt separately
    Args:
        `queries`: The queries of the dataset we have selected. It's a Huggingface dataset with the two fields (`_id`, `text`)
        `es_client`: An instance of a Python Elasticsearch client
        `index_name`: The name of the Elasticsearch index
        `size`: The (maximum) number of documents that we will retrieve per query
        `batch_size`: It represents the number of queries we can send per request.

    Returns:
        A nested dictionary where the outer key is the "query id" that points to (<doc_id>, <BM25-score>) key-value pairs e.g.
        {"my_query_id_1": {"my_doc_1": 23.5, "my_doc_2": 11.33}, "my_query_id_22": {"my_doc_3": 20.5, "my_doc_4": 4.3}, ...}

    """

    def generate_request(query_text: str):
        """Create the request body for the ES requests"""
        return {
            "_source": False,
            "query": {
                "multi_match": {
                    "query": query_text,
                    "type": "best_fields",
                    "fields": ["title", "txt"],
                    "tie_breaker": 0.5,
                }
            },
            "size": size,
        }

    def retrieve_batch(query_ids, es_requests):
        """Get docs for a mini-batch of requests"""
        batch_dict = dict()
        kwargs: dict[str, Any] = {
            "index": index_name,
            "search_type": "dfs_query_then_fetch",
        }
        try:
            es_response = es_client.msearch(searches=es_requests, **kwargs)
            for qid, resp in zip(query_ids, es_response["responses"]):
                batch_dict[qid] = {
                    hit["_id"]: hit["_score"] for hit in resp["hits"]["hits"]
                }
        except Exception as e:
            print(str(e))
        return batch_dict

    qids, requests = [], []
    es_responses = dict()

    for query in queries:
        qids.append(query["_id"])
        requests.append({})
        requests.append(generate_request(query["text"]))

        # retrieve in batches
        if len(qids) == batch_size:
            es_responses.update(retrieve_batch(qids, requests))
            qids = []
            requests = []

    # check for leftovers
    if len(qids) > 0:
        es_responses.update(retrieve_batch(qids, requests))
        qids, requests = [], []

    return es_responses

Then, we move to functions that rely on Hugginface's `datasets` library to fetch the `corpus`, `queries` and `qrels` files

In [None]:
def download_corpus(
    dataset_name: str,
) -> Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset]:
    """
    Download corpus from Huggingface
    Args:
        `dataset_name`: The name of the BEIR dataset that we have selected
    Returns:
        An instance of a Hugggingface dataset
    """

    mteb_dataset_name = f"mteb/{dataset_name}"

    # Dataset({
    #     features: ['_id', 'title', 'text'],
    #     num_rows: 25657
    # })
    corpus = datasets.load_dataset(mteb_dataset_name, "corpus", split="corpus")

    return corpus


def download_queries_and_qrels(dataset_name: str):
    """
    Download queries, qrels from Huggingface
    Args:
        `dataset_name`: The name of the BEIR dataset that we have selected
    Returns:
        A tuple of: (<an instance of a Hugggingface dataset>, <a dictionary holding the qrels information>)
    """

    mteb_dataset_name = f"mteb/{dataset_name}"
    qrels_raw = datasets.load_dataset(
        mteb_dataset_name,
        "default",
        split="test" if dataset_name != "msmarco" else "dev",
    )

    # convert to `pytrec_eval` compatible format
    qrels = defaultdict(dict)
    for q in qrels_raw:
        qrels[q["query-id"]][q["corpus-id"]] = int(q["score"])

    queries = datasets.load_dataset(
        mteb_dataset_name, "queries", split="queries"
    ).filter(lambda r: r["_id"] in qrels)

    return queries, dict(qrels)

---

## Running the pipeline

Now, we can execute the "retrieve and rerank" pipeline step by step

### Corpus to our Elasticsearch index

First, we create the index that will host the corpus

In [None]:
create_index(name=INDEX_NAME, es_client=client)

Then, we download the corpus and push it into the index

In [None]:
corpus = download_corpus(dataset_name=DATASET)

In [None]:
index_corpus(es_client=client, corpus=corpus, index_name=INDEX_NAME)

Let's move to the retrieval part

### 1st stage retrieval with BM25

First, we download the `test` split of the dataset we have selected

In [None]:
queries, qrels = download_queries_and_qrels(dataset_name=DATASET)

* The `queries` file is a Hugginface dataset with two keys ['_id', 'text'],
* The `qrels` file contains the relationships between a `query_id` and a list of documents. We have transformed into a `pytrec_eval`-compatible format i.e. it's a nested dictionary where the outer key is the query id that points to dictionary with (`doc_id`, `score`) key-value pairs (a score >0 denotes relevance)

In [None]:
len(queries)

Now, let's retrieve the **top-100** documents per query using BM25

In [None]:
bm25_responses = retrieve(
    queries=queries, index_name=INDEX_NAME, size=100, es_client=client
)

And finally, let's compute the performance of BM25 on this dataset. We are using `nDCG@10` as our metric

In [None]:
# specify evaluator
METRICS_TO_EVALUATE = {"ndcg_cut_10"}
evaluator = pytrec_eval.RelevanceEvaluator(qrels, METRICS_TO_EVALUATE)


# get score per query
eval_per_query = evaluator.evaluate(bm25_responses)


# aggregate scores across queries
eval_scores = defaultdict(list)

for _, vals in eval_per_query.items():
    for metric, metric_score in vals.items():
        eval_scores[metric].append(metric_score)

for metric, _scores in eval_scores.items():
    print(f"{metric}: {np.mean(_scores)}")

## 2nd stage reranking

Now, let's move to the reranking part. In this example we are using a small cross-encoder model to optimize the ordering of our results. We will use the `sentence-transformers` library to load the model and do the scoring

In [None]:
reranking_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", max_length=512)

Some helper structures to speed up processing

In [None]:
queries_dict = {q["_id"]: q["text"] for q in queries}
corpus_dict = {doc["_id"]: f"{doc['title']} {doc['text']}" for doc in corpus}

and now it's time for the reranking part

In [None]:
results_after_reranking = dict()

for qid, bm25_res in tqdm(bm25_responses.items(), total=len(bm25_responses)):

    query_text = queries_dict[qid]
    doc_ids = [doc_id for doc_id, _ in bm25_res.items()]
    if len(doc_ids) == 0:
        results_after_reranking[qid] = dict()
        continue

    doc_texts = [corpus_dict[doc_id] for doc_id in doc_ids]

    # rescore with the reranking model
    scores = reranking_model.predict([(query_text, doc_text) for doc_text in doc_texts])

    results_after_reranking[qid] = {
        doc_id: float(score) for doc_id, score in zip(doc_ids, scores)
    }

and let's calculate the metric scores for the reranked results

In [None]:
post_reranking_eval_scores_per_query = evaluator.evaluate(results_after_reranking)

post_reranking_eval_scores = defaultdict(list)

for qid, vals in post_reranking_eval_scores_per_query.items():
    for metric, metric_score in vals.items():
        post_reranking_eval_scores[metric].append(metric_score)

for metric, scores in post_reranking_eval_scores.items():
    print(f"{metric}: {np.mean(scores)}")

which in most cases will provide a significant boost in performance

## Bonus section

### Judge rate
Let's do some extra analysis and try to answer the question `"How many times is an evaluator presented with (query, document) pairs for which there is no ground truth information?"`
In other words, we calculate the percentage of cases where the `qrels` file contains a relevance score for a particular document in the result list.
Let's start with BM25 by focusing on the **top-10** retrieved documents

In [None]:
TOP_K = 10

judge_rate_per_query = []

for qid, doc_scores in bm25_responses.items():
    top_k_doc_ids = [
        doc_id
        for doc_id, score in sorted(
            doc_scores.items(), key=lambda x: x[1], reverse=True
        )[:TOP_K]
    ]
    if len(top_k_doc_ids) == 0:
        continue

    nr_labeled_docs = sum(1 for doc_id in top_k_doc_ids if doc_id in qrels[qid])
    judge_rate_per_query.append(nr_labeled_docs / len(top_k_doc_ids))

print(f'"Judge rate" for {DATASET} is {np.mean(judge_rate_per_query) * 100.0:.3}%')

while for the reranked documents it is:

In [None]:
judge_rate_per_query = []

for qid, doc_scores in results_after_reranking.items():
    top_k_doc_ids = [
        doc_id
        for doc_id, score in sorted(
            doc_scores.items(), key=lambda x: x[1], reverse=True
        )[:TOP_K]
    ]
    if len(top_k_doc_ids) == 0:
        continue

    nr_labeled_docs = sum(1 for doc_id in top_k_doc_ids if doc_id in qrels[qid])
    judge_rate_per_query.append(nr_labeled_docs / len(top_k_doc_ids))

print(
    f'"Judge rate" for {DATASET} (reranked) is {np.mean(judge_rate_per_query) * 100.0:.3}%'
)

---

### Confidence intervals

In this section we will briefly touch upon the concepts of `confidence intervals` and `statistical significance` and we will see how we can use them to determine whether improvements in our pipelines are significant or not.

We can think of it as follows: Our goal is to estimate the performance of our pipeline (retrieval and/or reranking) on a target corpus. Ideally, we would like to have access to **all** queries that our end-users will run against it but of course this is impossible. Instead, we have the set of test queries provided by the benchmark and we implicitly assume that the performance on this set can act as an accurate proxy of the overall performance (in the ideal scenario).

But we can make some extra assumptions to increase the reliability of our analysis. [Confidence intervals](https://en.wikipedia.org/wiki/Confidence_interval), a concept from statistical theory, give us a tool to handle our uncertainty. By setting a certain level of confidence, let's go with 95% in this example, we can derive a range of values that will likely contain the parameter of interest (here the performance in the **ideal** scenario). In other words, if we repeated the same process an infinite number of times (by drawing different test sets) we could be confident that in 95% of them the confidence interval would encompass the true value.

The code below shows an example of deriving confidence intervals using [bootstrapping](https://en.wikipedia.org/wiki/Bootstrapping_\(statistics\)) combined with the `percentile` method. It should be noted that this statistic is affected a lot by the number and the variability of queries in the dataset i.e. smaller confidence intervals are expected for larger query sets and vice versa
 

In [None]:
def get_ci_with_bootstrapping(scores: list, nr_bootstraps=1000, percentile=95):
    """
    Compute confidence intervals using bootstrapping and the percentile method
    Args:
        `scores`: The list of scores to be averaged
        `nr_bootstraps`: The number of bootstrap samples to collect
        `percentile`: The type of confidence interval to compute. It should be a number in (0, 100),
            by default it computes 95% CI
    Returns:
        The confidence interval
    """
    estimates = []
    for _ in range(nr_bootstraps):
        sample = np.random.choice(scores, len(scores), replace=True)
        estimates.append(np.mean(sample))

    half_percentile = (100.0 - percentile) / 2.0
    return np.percentile(estimates, [half_percentile, 100.0 - half_percentile])

and we can apply it to our results as follows:

In [None]:
ndcg_scores = post_reranking_eval_scores["ndcg_cut_10"]
get_ci_with_bootstrapping(ndcg_scores, percentile=95, nr_bootstraps=1000)

The way to interpret this would be to say that we are 95% confident that the `nDCG@10` score in the ideal scenario lies within that interval

Confidence intervals can be used in the context of significance testing. For example, if we wanted to compare two pipelines (retrieval and/or reranking) on a dataset one way to do this would be to:
* Decide on a confidence level (e.g. 90% or 95%)
* Compute confidence intervals for the performance of model A 
* Compute confidence intervals for the performance of model B
* Check whether there is an overlap between the two intervals. 

In the last step, if there is **no** overlap we can say that the observed difference in performance between the two pipelines is **statistically significant**. 