misc/batch_processing.ipynb (460 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Batch Processing with Message Batches API\n",
"\n",
"Message Batches allow you to process large volumes of Messages requests asynchronously and cost-effectively. This cookbook demonstrates how to use the Message Batches API to handle bulk operations while reducing costs by 50%.\n",
"\n",
"In this cookbook, we will demonstrate how to:\n",
"\n",
"1. Create and submit message batches\n",
"2. Monitor batch processing status\n",
"3. Retrieve and handle batch results\n",
"4. Implement best practices for effective batching"
]
},
{
"cell_type": "markdown",
"metadata": {
"vscode": {
"languageId": "plaintext"
}
},
"source": [
"## Setup\n",
"\n",
"First, let's set up our environment with the necessary imports:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install anthropic"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import anthropic\n",
"import time\n",
"\n",
"client = anthropic.Anthropic()\n",
"MODEL_NAME = \"claude-3-5-sonnet-20241022\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example 1: Basic Batch Processing\n",
"\n",
"Let's start with a simple example that demonstrates creating and monitoring a batch of message requests."
]
},
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Batch ID: msgbatch_01GgqTz9XzriGNHzTSGZsJJ8\n",
"Status: in_progress\n",
"Created at: 2024-10-08 00:46:30.694748+00:00\n"
]
}
],
"source": [
"# Prepare a list of questions for batch processing\n",
"questions = [\n",
" \"How do solar panels convert sunlight into electricity?\",\n",
" \"What's the difference between mutual funds and ETFs?\",\n",
" \"What is a pick and roll in basketball?\",\n",
" \"Why do leaves change color in autumn?\"\n",
"]\n",
"\n",
"# Create batch requests\n",
"batch_requests = [\n",
" {\n",
" \"custom_id\": f\"question-{i}\",\n",
" \"params\": {\n",
" \"model\": MODEL_NAME,\n",
" \"max_tokens\": 1024,\n",
" \"messages\": [\n",
" {\"role\": \"user\", \"content\": question}\n",
" ]\n",
" }\n",
" }\n",
" for i, question in enumerate(questions)\n",
"]\n",
"\n",
"# Submit the batch\n",
"response = client.beta.messages.batches.create(\n",
" requests=batch_requests\n",
")\n",
"\n",
"print(f\"Batch ID: {response.id}\")\n",
"print(f\"Status: {response.processing_status}\")\n",
"print(f\"Created at: {response.created_at}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Monitoring Batch Progress\n",
"\n",
"Now let's monitor the batch processing status:"
]
},
{
"cell_type": "code",
"execution_count": 69,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 46, 47, 283392, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_01GgqTz9XzriGNHzTSGZsJJ8/results', type='message_batch')\n",
"Status: ended\n",
"\n",
"Batch processing complete!\n",
"\n",
"Request counts:\n",
" Succeeded: 4\n",
" Errored: 0\n",
" Processing: 0\n",
" Canceled: 0\n",
" Expired: 0\n"
]
}
],
"source": [
"def monitor_batch(batch_id, polling_interval=5):\n",
" while True:\n",
" batch_update = client.beta.messages.batches.retrieve(batch_id)\n",
" batch_update_status = batch_update.processing_status\n",
" print(batch_update)\n",
" print(f\"Status: {batch_update_status}\")\n",
" if batch_update_status == \"ended\": \n",
" return batch_update\n",
" \n",
" time.sleep(polling_interval)\n",
"\n",
"# Monitor our batch\n",
"batch_result = monitor_batch(response.id) \n",
"print(\"\\nBatch processing complete!\")\n",
"print(\"\\nRequest counts:\")\n",
"print(f\" Succeeded: {batch_result.request_counts.succeeded}\")\n",
"print(f\" Errored: {batch_result.request_counts.errored}\")\n",
"print(f\" Processing: {batch_result.request_counts.processing}\")\n",
"print(f\" Canceled: {batch_result.request_counts.canceled}\")\n",
"print(f\" Expired: {batch_result.request_counts.expired}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Retrieving Results\n",
"\n",
"Once the batch is complete, we can retrieve and process the results:"
]
},
{
"cell_type": "code",
"execution_count": 70,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 46, 47, 283392, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_01GgqTz9XzriGNHzTSGZsJJ8/results', type='message_batch')\n",
"Status: ended\n",
"\n",
"Batch msgbatch_01GgqTz9XzriGNHzTSGZsJJ8 Summary:\n",
"Status: ended\n",
"Created: 2024-10-08 00:46:30.694748+00:00\n",
"Ended: 2024-10-08 00:46:47.283392+00:00\n",
"Expires: 2024-10-09 00:46:30.694748+00:00\n",
"\n",
"Individual Results:\n",
"\n",
"Result for question-0:\n",
"Status: succeeded\n",
"Content: Solar panels convert sunlight into electricity through a process called the photovoltaic effect. Here's a step-by-step explanation of how this works:\n",
"\n",
"1. Solar panel composition:\n",
"Solar panels are made...\n",
"\n",
"Result for question-1:\n",
"Status: succeeded\n",
"Content: Mutual funds and ETFs (Exchange-Traded Funds) are both popular investment vehicles that allow investors to diversify their portfolios, but they have several key differences:\n",
"\n",
"1. Trading:\n",
"- Mutual fund...\n",
"\n",
"Result for question-2:\n",
"Status: succeeded\n",
"Content: A pick and roll, also known as a screen and roll, is a fundamental offensive play in basketball involving two players. Here's how it works:\n",
"\n",
"1. The ball handler (usually a guard) has possession of the...\n",
"\n",
"Result for question-3:\n",
"Status: succeeded\n",
"Content: Leaves change color in autumn due to a combination of factors, primarily related to changes in temperature, daylight, and the tree's biological processes. Here's a breakdown of why this happens:\n",
"\n",
"1. C...\n"
]
}
],
"source": [
"def process_results(batch_id):\n",
" # First get the batch status\n",
" batch = client.beta.messages.batches.retrieve(batch_id)\n",
" \n",
" print(f\"\\nBatch {batch.id} Summary:\")\n",
" print(f\"Status: {batch.processing_status}\")\n",
" print(f\"Created: {batch.created_at}\")\n",
" print(f\"Ended: {batch.ended_at}\")\n",
" print(f\"Expires: {batch.expires_at}\")\n",
" \n",
" if batch.processing_status == \"ended\":\n",
" print(\"\\nIndividual Results:\")\n",
" for result in client.beta.messages.batches.results(batch_id):\n",
" print(f\"\\nResult for {result.custom_id}:\")\n",
" print(f\"Status: {result.result.type}\")\n",
" \n",
" if result.result.type == \"succeeded\":\n",
" print(f\"Content: {result.result.message.content[0].text[:200]}...\")\n",
" elif result.result.type == \"errored\":\n",
" print(\"Request errored\")\n",
" elif result.result.type == \"canceled\":\n",
" print(\"Request was canceled\")\n",
" elif result.result.type == \"expired\":\n",
" print(\"Request expired\")\n",
"\n",
"# Example usage:\n",
"batch_status = monitor_batch(response.id)\n",
"if batch_status.processing_status == \"ended\":\n",
" process_results(batch_status.id)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example 2: Advanced Batch Processing for Different Message Types\n",
"\n",
"This example demonstrates more advanced usage, including error handling and processing different types of requests in a single batch including a simple message, a message with a system prompt, a multi-turn message, and a message with an image. "
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Complex batch ID: msgbatch_011FAkvqkL8pEskdyS3xdmNW\n"
]
}
],
"source": [
"import base64\n",
"def create_complex_batch():\n",
" # Get base64 encoded image\n",
" def get_base64_encoded_image(image_path):\n",
" with open(image_path, \"rb\") as image_file:\n",
" binary_data = image_file.read()\n",
" base_64_encoded_data = base64.b64encode(binary_data)\n",
" base64_string = base_64_encoded_data.decode('utf-8')\n",
" return base64_string\n",
"\n",
" # Mix of different request types\n",
" batch_requests = [\n",
" {\n",
" \"custom_id\": \"simple-question\",\n",
" \"params\": {\n",
" \"model\": MODEL_NAME,\n",
" \"max_tokens\": 1024,\n",
" \"messages\": [\n",
" {\"role\": \"user\", \"content\": \"What is quantum computing?\"}\n",
" ]\n",
" }\n",
" },\n",
" {\n",
" \"custom_id\": \"image-analysis\",\n",
" \"params\": {\n",
" \"model\": MODEL_NAME,\n",
" \"max_tokens\": 1024,\n",
" \"messages\": [\n",
" {\n",
" \"role\": \"user\",\n",
" \"content\": [\n",
" {\n",
" \"type\": \"image\",\n",
" \"source\": {\n",
" \"type\": \"base64\",\n",
" \"media_type\": \"image/jpeg\",\n",
" \"data\": get_base64_encoded_image(\"../images/sunset-dawn-nature-mountain-preview.jpg\")\n",
" }\n",
" },\n",
" {\n",
" \"type\": \"text\",\n",
" \"text\": \"Describe this mountain landscape. What time of day does it appear to be, and what weather conditions do you observe?\"\n",
" }\n",
" ]\n",
" }\n",
" ]\n",
" }\n",
" },\n",
" {\n",
" \"custom_id\": \"system-prompt\",\n",
" \"params\": {\n",
" \"model\": MODEL_NAME,\n",
" \"max_tokens\": 1024,\n",
" \"system\": \"You are a helpful science teacher.\",\n",
" \"messages\": [\n",
" {\"role\": \"user\", \"content\": \"Explain gravity to a 5-year-old.\"}\n",
" ]\n",
" }\n",
" },\n",
" {\n",
" \"custom_id\": \"multi-turn\",\n",
" \"params\": {\n",
" \"model\": MODEL_NAME,\n",
" \"max_tokens\": 1024,\n",
" \"messages\": [\n",
" {\"role\": \"user\", \"content\": \"What is DNA?\"},\n",
" {\"role\": \"assistant\", \"content\": \"DNA is like a blueprint for living things...\"},\n",
" {\"role\": \"user\", \"content\": \"How is DNA copied?\"}\n",
" ]\n",
" }\n",
" }\n",
" ]\n",
" \n",
" try:\n",
" response = client.beta.messages.batches.create(\n",
" requests=batch_requests\n",
" )\n",
" return response.id\n",
" except Exception as e:\n",
" print(f\"Error creating batch: {e}\")\n",
" return None\n",
"complex_batch_id = create_complex_batch()\n",
"print(f\"Complex batch ID: {complex_batch_id}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Great now let's view the results of the batch:"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')\n",
"Status: in_progress\n",
"BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 24, 27, 768229, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_011FAkvqkL8pEskdyS3xdmNW/results', type='message_batch')\n",
"Status: ended\n",
"\n",
"Batch msgbatch_011FAkvqkL8pEskdyS3xdmNW Summary:\n",
"Status: ended\n",
"Created: 2024-10-08 00:23:58.507550+00:00\n",
"Ended: 2024-10-08 00:24:27.768229+00:00\n",
"Expires: 2024-10-09 00:23:58.507550+00:00\n",
"\n",
"Individual Results:\n",
"\n",
"Result for simple-question:\n",
"Status: succeeded\n",
"Content: Quantum computing is an advanced form of computing that uses the principles of quantum mechanics to process information. Unlike classical computers that use bits (0s and 1s) to store and process data,...\n",
"\n",
"Result for image-analysis:\n",
"Status: succeeded\n",
"Content: This image captures a breathtaking mountain landscape at sunset. The sun is visible as a bright orb just dipping behind the distant mountain ranges, casting a warm golden glow across the entire scene....\n",
"\n",
"Result for system-prompt:\n",
"Status: succeeded\n",
"Content: Sure! Here's how I might explain gravity to a 5-year-old:\n",
"\n",
"Gravity is like a big invisible hug that the Earth gives to everything on it. It's what keeps us stuck to the ground instead of floating away...\n",
"\n",
"Result for multi-turn:\n",
"Status: succeeded\n",
"Content: DNA replication is the process by which DNA makes a copy of itself during cell division. Here's a basic overview of how it works:\n",
"\n",
"1. Unwinding: The double helix structure of DNA unwinds, and the two ...\n"
]
}
],
"source": [
"# Example usage:\n",
"batch_status = monitor_batch(complex_batch_id)\n",
"if batch_status.processing_status == \"ended\":\n",
" process_results(batch_status.id)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "py311",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}