# Resumable backup and restore for very large indexes

**This unofficial code sample is offered "as-is" and might not work for all customers and scenarios. If you run into difficulties, you should manually recreate and reload your search index on a new search service.**

If your indexes contain more than 100,000 documents, use this sample code to move your index onto a new search service. In contrast with the simple [backup and restore code](https://github.com/Azure/azure-search-vector-samples/tree/main/demo-python/code/utilities/index-backup-restore) that uses document paging to assemble the backup, this code uses timestamps to create a sorted list of documents for the backup. The code then uses filters to batch and move documents from one index to another.

> **Note**: Azure AI Search now supports [service upgrades](https://learn.microsoft.com/azure/search/search-how-to-upgrade) and [pricing tier changes](https://learn.microsoft.com/azure/search/search-capacity-planning#change-your-pricing-tier). If you're backing up and restoring your index for migration to a higher capacity service, you now have other options.

This code requires a [timestamp field](https://learn.microsoft.com/rest/api/searchservice/supported-data-types#edm-data-types-for-nonvector-fields) that indicates when a document was created and updated. It must be [filterable](https://learn.microsoft.com/azure/search/search-filters) and [sortable](https://learn.microsoft.com/azure/search/query-odata-filter-orderby-syntax). If you routinely update this timestamp every time you update a document in your index, you have a built-in record of the last time the document was changed and you can use this timestamp field to implement a resumable backup and restore. The most recently backed up timestamp can be recorded so a backup can pause at that timestamp and resume at a later time.

You can also run parallel backup jobs to increase backup speed by setting partitions and backup jobs to greater than 1.  When using parallel backup jobs, consider the following limitations:

* If documents are added to the index or existing documents are modified during the backup, modified or new documents are not included in the backup as they have a more recent timestamp than when the backup started.

* Deletes during the backup may not be propogated to the backup copy of the index.  It's not recommended to delete any documents during a backup

## Install packages

In [1]:
! pip install -r requirements.txt --quiet

## Load environment variables

In [7]:
from dotenv import load_dotenv
from azure.identity.aio import DefaultAzureCredential
from azure.core.credentials import AzureKeyCredential
import os

# Copy sample.env to .env and change the variables for your service
load_dotenv(override=True)

# The sample.env contains variables than what's needed for this code. Ignore any variables not used here.
# Provide a search service containing the source index for the backup operation
source_endpoint = os.environ["AZURE_SEARCH_SOURCE_SERVICE_ENDPOINT"]
# Provide an admin API key if you're using key-based authentication. Using a key is optional. See https://learn.microsoft.com/azure/search/keyless-connections
source_credential = AzureKeyCredential(os.getenv("AZURE_SEARCH_SOURCE_ADMIN_KEY")) if os.getenv("AZURE_SEARCH_SOURCE_ADMIN_KEY") else DefaultAzureCredential()
# Provide a second search service as the destination for the new restored index
destination_endpoint = os.environ["AZURE_SEARCH_DESTINATION_SERVICE_ENDPOINT"]
destination_credential = AzureKeyCredential(os.getenv("AZURE_SEARCH_DESTINATION_ADMIN_KEY")) if os.getenv("AZURE_SEARCH_DESTINATION_ADMIN_KEY") else DefaultAzureCredential()
# Name of the index to be backed up
index_name = os.environ["AZURE_SEARCH_INDEX"]
# Name of the timestamp field 
timestamp_field_name = os.environ["AZURE_SEARCH_TIMESTAMP_FIELD"]

In [None]:
from azure.search.documents.indexes.aio import SearchIndexClient
from azure.search.documents.aio import SearchClient
from azure.search.documents.indexes.models import BinaryQuantizationCompression, SearchField
from datetime import datetime, timedelta
from uuid import uuid4
import random

enable_compression = False

# Copies an index definition from the source service to the destination
async def copy_index_definition(source_index_client: SearchIndexClient, destination_index_client: SearchIndexClient, index_name: str):
    index = await source_index_client.get_index(index_name)
    # Check for any synonym maps
    synonym_map_names = []
    for field in index.fields:
        if field.synonym_map_names:
            synonym_map_names.extend(field.synonym_map_names)
    
    # Copy over synonym maps if they exist
    for synonym_map_name in synonym_map_names:
        synonym_map = await source_index_client.get_synonym_map(synonym_map_name)
        await destination_index_client.create_or_update_synonym_map(synonym_map)

    if enable_compression:
        for profile in index.vector_search.profiles:
            if not profile.compression_name:
                profile.compression_name = "mycompression"
        
        index.vector_search.compressions.append(
            BinaryQuantizationCompression(
                compression_name="mycompression",
                rerank_with_original_vectors=True,
                default_oversampling=10
            ))
    
    # Copy over the index
    await destination_index_client.create_or_update_index(index)

# Method to convert a timestamp to datetime
def datetime_to_timestamp(date: datetime) -> str:
    # Trim microseconds to milliseconds. Timestamp precision is to milliseconds only. See https://learn.microsoft.com/rest/api/searchservice/supported-data-types#edm-data-types-for-nonvector-fields for more information
    return date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")[:-3] + "Z"

def get_random_timestamp(start_time: datetime, end_time: datetime) -> str:
    delta = end_time - start_time
    random_seconds = random.randint(0, int(delta.total_seconds()))
    return datetime_to_timestamp(start_time + timedelta(seconds=random_seconds))

# Add a timestamp field to the index
async def add_timestamp_to_index(source_index_client: SearchIndexClient, source_client: SearchClient, index_name: str, timestamp_field_name: str, start_timestamp: datetime, end_timestamp: datetime):
    index = await source_index_client.get_index(index_name)
    timestamp_field_added = False
    key_field = None
    for field in index.fields:
        if not key_field and field.key:
            key_field = field
        if field.name == timestamp_field_name:
            timestamp_field_added = True

    if not timestamp_field_added:
        index.fields.append(SearchField(name=timestamp_field_name, type="Edm.DateTimeOffset", facetable=False, filterable=True, sortable=True, hidden=False))

    await source_index_client.create_or_update_index(index)

    # Create a session when paging through results to ensure consistency in multi-replica services
    # For more information, please see https://learn.microsoft.com/azure/search/index-similarity-and-scoring#scoring-statistics-and-sticky-sessions
    session_id = str(uuid4())
    get_next_results = True
    while get_next_results:
        total_results_size = 0
        filter = f"{timestamp_field_name} eq null"
        results = await source_client.search(
            search_text="*",
            top=100000,
            filter=filter,
            session_id=session_id,
            select=[key_field.name]
        )

        results_by_page = results.by_page()
        async for page in results_by_page:
            # Add a timestamp to this page of results
            update_page = [{ key_field.name: item[key_field.name], timestamp_field_name: get_random_timestamp(start_timestamp, end_timestamp) } async for item in page]
            if len(update_page) > 0:
                await source_client.merge_documents(update_page)
            total_results_size += len(update_page)
        
        # If any results were returned, it's possible there's more documents without a timestamp
        # Continue the search
        get_next_results = total_results_size > 0


## (Optional) Add a timestamp column

If you don't have a timestamp column to use for resuming, you can add one by generating new timestamps. It's important to attempt to evenly distribute these timestamps across your index

In [None]:
from datetime import datetime, time

async with SearchIndexClient(endpoint=source_endpoint, credential=source_credential) as source_index_client, SearchClient(endpoint=source_endpoint, credential=source_credential, index_name=index_name) as source_client:
    now = datetime.now()
    start_of_day = datetime.combine(now.date(), time.min)
    end_of_day = datetime.combine(now.date(), time.max)

    await add_timestamp_to_index(source_index_client, source_client, index_name, timestamp_field_name, start_timestamp=start_of_day, end_timestamp=end_of_day)


## Copy index definition
Copy the source index definition to the destination service.

In [4]:
source_index_client = SearchIndexClient(endpoint=source_endpoint, credential=source_credential)
destination_index_client = SearchIndexClient(endpoint=destination_endpoint, credential=destination_credential)

await copy_index_definition(source_index_client, destination_index_client, index_name)

In [5]:
from azure.search.documents.indexes.aio import SearchIndexClient
from azure.search.documents.indexes.models import SearchFieldDataType
from typing import List

# Method to validate the timestamp field exists, is filterable, and is sortable
async def validate_resume_backup_and_restore(index_client: SearchIndexClient, index_name: str, timestamp_field_name: str) -> bool:
    index = await index_client.get_index(index_name)

    found_field = False
    for field in index.fields:
        if field.name == timestamp_field_name:
            found_field = True
            if field.type != SearchFieldDataType.DateTimeOffset:
                # Field must be a timestamp
                return False
            if not field.filterable:
                # Field must be filterable
                return False
            if not field.sortable:
                # Field must be sortable
                return False
            break
    
    # Field must exist on the index
    return found_field

# Method to validate which fields can and cannot be backed up
async def validate_fields_backup_and_restore(index_client: SearchIndexClient, index_name: str) -> List[str]:
    missing_fields = []
    index = await index_client.get_index(index_name)
    for field in index.fields:
        message = ""
        # Complex fields are not marked as stored - skip
        if not field.stored and not field.fields:
            message += f"Field {field.name} cannot be backed up because it's not marked as stored\n"
        elif field.hidden: 
            message += f"Field {field.name} cannot be backed up because it's not marked as retrievable\n"
        
        if message:
            missing_fields.append(message)
    
    return missing_fields


## Validate backup and restore

* Make sure the timestamp field is filterable and sortable.
* If a field is not marked as [stored](https://learn.microsoft.com/azure/search/vector-search-how-to-storage-options), it cannot be backed up.
* If a field is not marked as [retrievable](https://learn.microsoft.com/azure/search/search-pagination-page-layout#result-composition), it won't be backed up.
  * This setting may be changed if the field is marked as stored.
  * If the field was not marked as stored, it cannot be marked as retrievable

In [None]:
can_resume_backup_and_restore = await validate_resume_backup_and_restore(source_index_client, index_name, timestamp_field_name)
if can_resume_backup_and_restore:
    print("Index has a valid timestamp field and can use resumable backup and restore")
else:
    print("Index does not have a valid timestamp field and cannot use resumable backup and restore")

missing_fields_messages = await validate_fields_backup_and_restore(source_index_client, index_name)
for message in missing_fields_messages:
    print(message)

In [None]:
from azure.search.documents.aio import SearchClient
from typing import Optional, AsyncGenerator, List, Callable, Tuple
from tqdm.notebook import tqdm
import ipywidgets as widgets
from uuid import uuid4
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from copy import deepcopy
import os
import json
import re

# Class representing a partition, a subset of an index that can be used to create parallel backup jobs
@dataclass
class Partition:
    id: int
    start: str
    end: str
    last: str

# Method to check how many documents are remaining in an index. The check can be scoped down to a single part of an index by timestamp
async def get_total_documents_remaining(client: SearchClient, timestamp_field_name: str, min_timestamp: Optional[str] = None, max_timestamp: Optional[str] = None) -> int:
    filter = None
    if min_timestamp and not max_timestamp:
        # If a minimum timestamp is specified, check all documents greater than or equal to this timestaamp
        filter = f"{timestamp_field_name} ge {min_timestamp}"
    elif min_timestamp and max_timestamp:
        # If minimum and maximum timestamps are specified, check all documents between these timestamps
        filter = f"{timestamp_field_name} ge {min_timestamp} and {timestamp_field_name} le {max_timestamp}"
    results = await client.search(
        search_text="*",
        include_total_count=True,
        filter=filter,
        top=0
    )
    return await results.get_count()

# Method to find either the minimum or maximum timestamp in an index
async def get_timestamp_bound(client: SearchClient, timestamp_field_name: str, max: bool) -> Optional[str]:
    result = await client.search(
        search_text="*",
        order_by=f"{timestamp_field_name} {'desc' if max else 'asc'}",
        top=1,
        select=[timestamp_field_name]
    )
    result = [item async for item in result]
    if len(result) == 0:
        return None
    return result[0][timestamp_field_name]

# Methods to convert a timestamp to and from datetime
def timestamp_to_datetime(timestamp: str) -> datetime:
    return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
def datetime_to_timestamp(date: datetime) -> str:
    # Trim microseconds to milliseconds. Timestamp precision is to milliseconds only. See https://learn.microsoft.com/rest/api/searchservice/supported-data-types#edm-data-types-for-nonvector-fields for more information
    return date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")[:-3] + "Z"

# Method to get bounds of partitions for parallel backup jobs.
# Set desired_partitions to 1 to disable parallel backup jobs
async def get_partition_bounds(client: SearchClient, timestamp_field_name: str, desired_partitions: int = 2, partition_size_threshold: float = 0.05, min_timestamp: Optional[str] = None, max_timestamp: Optional[str] = None) -> List[datetime]:
    # Determine the minimum and maximum timestamps to backup. Default to taking them from the index
    if max_timestamp == None:
        max_timestamp = await get_timestamp_bound(client, timestamp_field_name, max=True)
        if max_timestamp == None:
            return []
    if min_timestamp == None:
        min_timestamp = await get_timestamp_bound(client, timestamp_field_name, max=False)

    # If there's only 1 timestamp or parallel backup jobs are disabled, do not partition
    if min_timestamp == max_timestamp or desired_partitions == 1:
        return []

    # Attempt to divide the index into roughly equally sized partitions
    # Partitions are not guaranteed to be of a similar size. The timestamp distribution of data in your index affects the size of each partition
    partition_splits = []
    low = timestamp_to_datetime(min_timestamp)
    for partition in range(desired_partitions - 1):
        high = timestamp_to_datetime(max_timestamp)
        remaining_partitions = desired_partitions - partition
        # Determine the goal size a partition should be. This is the total amount of unpartitioned documents over the number of partitions left to create
        # Partitions may be different sizes, specify a target percentage this partition size may be different from the target size
        # For example, it may be acceptable for partitions to be 8% larger than another partition
        # Unevely sized partitions may affect the speed of the parallel backup jobs
        target_partition_size = await get_total_documents_remaining(client, timestamp_field_name, min_timestamp=datetime_to_timestamp(low)) // remaining_partitions
        partition_threshold = target_partition_size * partition_size_threshold
        # If an optimal partition size cannot be picked, track all the potential partition sizes to pick the best one
        partition_sizes = []

        # Perform a modified binary search to determine the bounds of the partition
        best_split = None
        mid = low + (high - low) / 2
        while low <= high:
            current_partition_size = await get_total_documents_remaining(client, timestamp_field_name, datetime_to_timestamp(low), datetime_to_timestamp(mid))
            partition_sizes.append((mid, current_partition_size))
            # Check if the partition is an acceptable size. If it's not, continue the binary search
            if current_partition_size < target_partition_size + partition_threshold and current_partition_size > target_partition_size - partition_threshold:
                best_split = mid
                break
            elif current_partition_size < target_partition_size:
                mid = mid + (high - mid) / 2
            else:
                prev_high = high
                high = mid
                mid = mid - (mid - low) / 2
                if prev_high == high:
                    # No progress being made
                    best_split = None
                    break
        
        # If an acceptable partition could not be found, pick the one that has the closest size
        if best_split is None:
            min_difference = -1
            for split, partition_size in partition_sizes:
                difference = abs(target_partition_size - partition_size)
                if min_difference == -1 or difference < min_difference:
                    best_split = split
                    min_difference = difference

        if best_split:
            partition_splits.append(best_split)
            low = best_split + timedelta(milliseconds=1)
        else:
            # Cannot partition anymore, exit
            partition_splits.append(low)
            break

    return partition_splits

# Method to create partitions for parallel backup jobs
# Requires using the bounds from the previous method
async def get_partitions(client: SearchClient, timestamp_field_name: str, partition_splits: List[datetime], start_id: int = 0, min_timestamp: Optional[str] = None, max_timestamp: Optional[str] = None) -> List[Tuple[str, str]]:
    # The minimum and maximum timestamps in the source index are part of the partition bounds
    if max_timestamp  == None:
        max_timestamp = await get_timestamp_bound(client, timestamp_field_name, max=True)
        if max_timestamp == None:
            return []
    if min_timestamp == None:
        min_timestamp = await get_timestamp_bound(client, timestamp_field_name, max=False)

    # Create a new partition for every pair of bounds
    prev_partition_end = timestamp_to_datetime(min_timestamp)
    partitions = []
    for i, partition_split in enumerate(partition_splits):
        partitions.append(Partition(id=start_id + i, start=datetime_to_timestamp(prev_partition_end), end=datetime_to_timestamp(partition_split), last=None))
        # The next partition starts 1 millisecond after the previous one to avoid overlap
        prev_partition_end = partition_split + timedelta(milliseconds=1)
    partitions.append(Partition(id=start_id + len(partition_splits), start=datetime_to_timestamp(prev_partition_end), end=max_timestamp, last=None))
    return partitions

# Resume fetching search results from a source index for backup.
# May have timestamp bounds if resuming from a previous backup job or using parallel backup jobs
async def resume_backup_results(client: SearchClient, timestamp_field_name: str, timestamp: Optional[str], max_timestamp: Optional[str] = None, select=None) -> AsyncGenerator[List[dict], None]:
    # Create a session when paging through results to ensure consistency in multi-replica services
    # For more information, please see https://learn.microsoft.com/azure/search/index-similarity-and-scoring#scoring-statistics-and-sticky-sessions
    session_id = str(uuid4())
    # The maximum number of results from a single search query is 100,000. This can be exceeded by using sorting and filtering
    # For more information, please see https://learn.microsoft.com/azure/search/search-pagination-page-layout#paging-through-a-large-number-of-results
    max_results_size = 100000
    get_next_results = True
    while get_next_results:
        total_results_size = 0
        filter = None
        if timestamp and not max_timestamp:
            # If using a single timestamp, find all records greater or equal than it
            filter = f"{timestamp_field_name} ge {timestamp}"
        elif timestamp and max_timestamp:
            # If using a minimum and maximum timestamp, find all records between them
            filter = f"{timestamp_field_name} ge {timestamp} and {timestamp_field_name} le {max_timestamp}"
        results = await client.search(
            search_text="*",
            order_by=f"{timestamp_field_name} asc",
            top=max_results_size,
            filter=filter,
            session_id=session_id,
            select=select
        )
        results_by_page = results.by_page()

        async for page in results_by_page:
            next_page = [item async for item in page]
            # Count how many results are returned
            total_results_size += len(next_page)
            if len(next_page) == 0:
                break
            yield next_page
            timestamp = next_page[-1][timestamp_field_name]
        
        # If the maximum amount of results were returned, it's possible there's more results after the last timestamp searched
        # Continue the search using the most recent timestamp
        get_next_results = total_results_size == max_results_size

# Method to initiate a backup of a search service
# The numer of partitions (whether to use parallel backup jobs) and number of parallel backup uploads is configurable
# The strategy used to save partition state is configurable using on_backup_page
async def backup_index_with_resume(client: SearchClient, destination_client: SearchClient, timestamp_field_name: str, partitions: List[Partition], backup_tasks:int = 2, on_backup_page: Optional[Callable[[Partition], None]] = None) -> None:
    total_documents = 0
    total_partitions = len(partitions)
    for partition in partitions:
        total_documents += await get_total_documents_remaining(client, timestamp_field_name, partition.last or partition.start, partition.end)
    if total_documents == 0:
        return
    
    # Create a progress bar to visualize backup progress
    # Create a lable to track how many result pages are waiting for backup
    progress_bar = tqdm(total=total_documents, desc="Backing up documents...", unit="docs", unit_scale=False)
    pages_label = widgets.Label(value="Queued Result Pages: 0")
    display(pages_label)
    
    # Method to fetch all the search results for a backup job and queue them for backup
    async def get_results(partition: Partition, results_queue: asyncio.Queue):
        try:
            results = resume_backup_results(client, timestamp_field_name, timestamp=partition.last or partition.start, max_timestamp=partition.end)
            async for result_page in results:
                await results_queue.put((partition, result_page))
            await results_queue.put((partition, None))
        except asyncio.CancelledError:
            raise
    
    # Track how many parallel backup jobs have finished
    finished_partitions = 0
    finished_partitions_lock = asyncio.Lock()

    # Track backup job tasks
    backup_task_list = []

    # Method to fetch search results from a backup queue and back them up
    async def backup_results(results_queue: asyncio.Queue, partition_update_queue: asyncio.Queue):
        nonlocal finished_partitions
        try:
            while True:
                partition, result_page = await results_queue.get()
                if partition is None:
                    # Exit
                    break

                if result_page is None:
                    # The backup job completed. If all backup jobs have completed, exit
                    async with finished_partitions_lock:
                        finished_partitions += 1
                        if finished_partitions >= total_partitions:
                            # Ensure checkpoint job ends
                            await partition_update_queue.put(None)
                            # Ensure backup jobs ends
                            for _ in backup_task_list:
                                await results_queue.put((None, None))
                            progress_bar.n = total_documents
                            progress_bar.refresh()
                    break
                
                # Update the partition state with the most recently completed backup
                saved_timestamp = result_page[-1][timestamp_field_name]
                partition.last = saved_timestamp

                # Back up the search results and queue an update to the partition
                await destination_client.upload_documents(result_page)
                await partition_update_queue.put(deepcopy(partition))
                if progress_bar.n < progress_bar.total:
                    progress_bar.update(len(result_page))
        except asyncio.CancelledError:
            raise
    
    # Helper method to save a partition's state if it's been updated
    async def checkpoint_results(partition_update_queue: asyncio.Queue, output_queue: asyncio.Queue):
        partition_max_timestamps = {}
        try:
            while True:
                partition = await partition_update_queue.get()
                if partition is None:
                    # No more updates, all backup jobs finished
                    break
                pages_label.value=f"Queued Result Pages: {output_queue.qsize()}"

                # Only update this partition if this is the most recently processed update to the partition
                max_timestamp = partition_max_timestamps.get(partition.id)
                last_timestamp = timestamp_to_datetime(partition.last)
                if not max_timestamp or last_timestamp >= max_timestamp:
                    partition_max_timestamps[partition.id] = last_timestamp
                    on_backup_page(partition)
        except asyncio.CancelledError:
            raise

    results_queue = asyncio.Queue()
    partition_update_queue = asyncio.Queue()

    # Run producer and consumer concurrently
    result_task_list = [asyncio.create_task(get_results(partition, results_queue)) for partition in partitions]
    backup_task_list.extend([asyncio.create_task(backup_results(results_queue, partition_update_queue)) for _ in range(backup_tasks)])
    checkpoint_task = asyncio.create_task(checkpoint_results(partition_update_queue, results_queue))

    # Wait for all tasks to complete
    try:
        await asyncio.gather(*result_task_list)
        await asyncio.gather(*backup_task_list)
        await checkpoint_task
    except asyncio.CancelledError:
        for task in result_task_list:
            task.cancel()
        for task in backup_task_list:
            task.cancel()
        checkpoint_task.cancel()
        await asyncio.gather(*result_task_list, return_exceptions=True)
        await asyncio.gather(*backup_task_list, return_exceptions=True)
        try:
            await checkpoint_task
        except asyncio.CancelledError:
            pass

# Example implementation to store backup job state
# Saves partition JSON to files in the "partitions" directory
backup_state_directory = "partitions"
backup_format = os.path.join(backup_state_directory, "backup-{partition}.json")
if not os.path.exists(backup_state_directory):
    os.makedirs(backup_state_directory)
def on_backup_page(partition: Partition) -> None:
    with open(backup_format.format(partition=partition.id), "w") as f:
        json.dump(asdict(partition), f, indent=2)

# Restore partition JSON from files in the "partitions" directory
def read_backups_state(directory: str) -> List[Partition]:
    if not os.path.isdir(directory):
        return []
    partitions = []
    for file in os.listdir(directory):
        if re.match(r'backup-\d+\.json', file):
            with open(os.path.join(directory, file), "r") as f:
                data = json.load(f)
                partitions.append(Partition(**data))

    return partitions

# Create incremental parittion files from a previous backup job
async def create_incremental_backup_partitions(client: SearchClient, timestamp_field_name: str, partitions: List[Partition], desired_partitions: int = 1, partition_size_threshold: float = 0.05, max_timestamp: Optional[str] = None) -> List[Partition]:
    if not max_timestamp:
        max_timestamp = await get_timestamp_bound(client, timestamp_field_name, max=True)
    min_timestamp = None
    last_id = None
    for partition in partitions:
        if not last_id:
            last_id = partition.id
        elif partition.id > last_id:
            last_id = partition.id
        if not min_timestamp:
            min_timestamp = partition.last
        elif partition.last:
            if timestamp_to_datetime(partition.last) > timestamp_to_datetime(min_timestamp):
                min_timestamp = partition.last
    
    partition_splits = await get_partition_bounds(client, timestamp_field_name, desired_partitions, partition_size_threshold, min_timestamp)
    partitions = await get_partitions(client, timestamp_field_name, partition_splits, start_id=last_id + 1, min_timestamp=min_timestamp, max_timestamp=max_timestamp)
    return partitions


## Initiate the backup

* Set `desired_partitions` to a value greater than 1 to set up parallel backup jobs.
* Change `backup_tasks` to determine how many parallel backup workers attempt to update the destination result with results from the source service.
* Changing `desired_partitions` and `backup_tasks` will change the speed of the backup.
  * Services with more [replicas](https://learn.microsoft.com/azure/search/search-capacity-planning#concepts-search-units-replicas-partitions) or a higher [SKU](https://learn.microsoft.com/azure/search/search-sku-tier) may benefit from a higher number of parallel backup jobs and parallel backup workers.
* Use `create_incremental_backup_partitions` to resume from a previous backup job if records have been added or updated.


In [8]:
source_client = SearchClient(source_endpoint, index_name, source_credential)
destination_client = SearchClient(destination_endpoint, index_name, destination_credential)

In [None]:
desired_partitions = 1
partitions = None
incremental_backup = True
incremental_partitions = None
if desired_partitions == 1:
    # Resume backup from the last timestamp in the destination index
    incremental_partitions = [
        Partition(
            id=0,
            start=await get_timestamp_bound(destination_client, timestamp_field_name, max=True),
            end=await get_timestamp_bound(source_client, timestamp_field_name, max=True),
            last=None
        )
    ]
else:
    partitions = read_backups_state(backup_state_directory)

if not partitions:
    partition_splits = await get_partition_bounds(source_client, timestamp_field_name, desired_partitions=desired_partitions)
    partitions = await get_partitions(source_client, timestamp_field_name, partition_splits)
elif incremental_backup and not incremental_partitions and partitions:
    incremental_partitions = await create_incremental_backup_partitions(source_client, timestamp_field_name, partitions, desired_partitions=desired_partitions)

await backup_index_with_resume(
    source_client,
    destination_client,
    timestamp_field_name,
    partitions=incremental_partitions or partitions,
    on_backup_page=on_backup_page,
    backup_tasks=desired_partitions * 2
)