This notebook shows how to ingest and search images using ColPali with Elasticsearch. Read our accompanying blog post on [ColPali in Elasticsearch](elastiacsearch-colpali-visual-document-search) for more context on this notebook. 

We will be using images from the [ViDoRe benchmark](https://huggingface.co/collections/vidore/vidore-benchmark-667173f98e70a1c0fa4db00d) as example data. 

The URL and API key for your Elasticsearch cluster are expected in a file `elastic.env` in this format: 
```
ELASTIC_HOST=<cluster-url>
ELASTIC_API_KEY=<api-key>
```

In [1]:
!pip install -r requirements.txt
from IPython.display import clear_output

clear_output()  # for less space usage.

First we load the sample data from huggingface and save it to disk.

In [2]:
from datasets import load_dataset
from tqdm.notebook import tqdm
import os

DATASET_NAME = "vidore/infovqa_test_subsampled"
DOCUMENT_DIR = "searchlabs-colpali"

os.makedirs(DOCUMENT_DIR, exist_ok=True)
dataset = load_dataset(DATASET_NAME, split="test")

for i, row in enumerate(tqdm(dataset, desc="Saving images to disk")):
    image = row.get("image")
    image_name = f"image_{i}.jpg"
    image_path = os.path.join(DOCUMENT_DIR, image_name)
    image.save(image_path)

Saving images to disk:   0%|          | 0/500 [00:00<?, ?it/s]

Here we load the ColPali model and define functions to generate vectors from images and text. 

In [3]:
import torch
from PIL import Image
from colpali_engine.models import ColPali, ColPaliProcessor

model_name = "vidore/colpali-v1.3"
model = ColPali.from_pretrained(
    "vidore/colpali-v1.3",
    torch_dtype=torch.float32,
    device_map="mps",  # "mps" for Apple Silicon, "cuda" if available, "cpu" otherwise
).eval()

col_pali_processor = ColPaliProcessor.from_pretrained(model_name)


def create_col_pali_image_vectors(image_path: str) -> list:
    batch_images = col_pali_processor.process_images([Image.open(image_path)]).to(
        model.device
    )

    with torch.no_grad():
        return model(**batch_images).tolist()[0]


def create_col_pali_query_vectors(query: str) -> list:
    queries = col_pali_processor.process_queries([query]).to(model.device)
    with torch.no_grad():
        return model(**queries).tolist()[0]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

This is where we are going over all our images and creating our multi-vectors with the ColPali model. 

In [4]:
import os
import time
import pickle

images = [os.path.join(DOCUMENT_DIR, f) for f in os.listdir(DOCUMENT_DIR)]
file_to_multi_vectors = {}

for image_path in tqdm(images, desc="Create ColPali Vectors"):
    file_name = os.path.basename(image_path)
    vectors_f32 = create_col_pali_image_vectors(image_path)
    file_to_multi_vectors[file_name] = vectors_f32

with open("col_pali_vectors.pkl", "wb") as f:
    pickle.dump(file_to_multi_vectors, f)

print(f"Saved {len(file_to_multi_vectors)} vector entries to disk")

Create ColPali Vectors:   0%|          | 0/500 [00:00<?, ?it/s]

Saved 500 vector entries to disk


This is the new `rank_vectors` field type, where we will be saving our ColPali vectors. 

In [5]:
from dotenv import load_dotenv
from elasticsearch import Elasticsearch

load_dotenv("elastic.env")

ELASTIC_API_KEY = os.getenv("ELASTIC_API_KEY")
ELASTIC_HOST = os.getenv("ELASTIC_HOST")
INDEX_NAME = "searchlabs-colpali"

es = Elasticsearch(ELASTIC_HOST, api_key=ELASTIC_API_KEY)

mappings = {"mappings": {"properties": {"col_pali_vectors": {"type": "rank_vectors"}}}}

if not es.indices.exists(index=INDEX_NAME):
    print(f"[INFO] Creating index: {INDEX_NAME}")
    es.indices.create(index=INDEX_NAME, body=mappings)
else:
    print(f"[INFO] Index '{INDEX_NAME}' already exists.")


def index_document(es_client, index, doc_id, document, retries=10, initial_backoff=1):
    for attempt in range(1, retries + 1):
        try:
            return es_client.index(index=index, id=doc_id, document=document)
        except Exception as e:
            if attempt < retries:
                wait_time = initial_backoff * (2 ** (attempt - 1))
                print(f"[WARN] Failed to index {doc_id} (attempt {attempt}): {e}")
                time.sleep(wait_time)
            else:
                print(f"Failed to index {doc_id} after {retries} attempts: {e}")
                raise

[INFO] Index 'searchlabs-colpali' already exists.


Load all images back from disk, create the vectors for them and index them into Elasticsearch. 

In [6]:
with open("col_pali_vectors.pkl", "rb") as f:
    file_to_multi_vectors = pickle.load(f)

for file_name, vectors in tqdm(file_to_multi_vectors.items(), desc="Index documents"):
    if es.exists(index=INDEX_NAME, id=file_name):
        continue

    index_document(
        es_client=es,
        index=INDEX_NAME,
        doc_id=file_name,
        document={"col_pali_vectors": vectors},
    )

Index documents:   0%|          | 0/500 [00:00<?, ?it/s]

Use the new `maxSimDotProduct` function to calculate the similarity between our query and the image vectors in Elasticsearch. 

In [7]:
from IPython.display import display, HTML
import os

query = "What do companies use for recruiting?"
es_query = {
    "_source": False,
    "query": {
        "script_score": {
            "query": {"match_all": {}},
            "script": {
                "source": "maxSimDotProduct(params.query_vector, 'col_pali_vectors')",
                "params": {"query_vector": create_col_pali_query_vectors(query)},
            },
        }
    },
    "size": 5,
}

results = es.search(index=INDEX_NAME, body=es_query)
image_ids = [hit["_id"] for hit in results["hits"]["hits"]]

html = "<div style='display: flex; flex-wrap: wrap; align-items: flex-start;'>"
for image_id in image_ids:
    image_path = os.path.join(DOCUMENT_DIR, image_id)
    html += f'<img src="{image_path}" alt="{image_id}" style="max-width:300px; height:auto; margin:10px;">'
html += "</div>"

display(HTML(html))

In [None]:
# We kill the kernel forcefully to free up the memory from the ColPali model.
print("Shutting down the kernel to free memory...")
import os

os._exit(0)