pai-python-sdk/inference/async_inference/async_inference.ipynb (479 lines of code) (raw):
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# 部署异步推理服务\n",
"\n",
"在复杂的模型推理场景中,例如AIGC、视频处理等场景中,模型服务推理耗时较长,存在长连接超时导致请求失败或实例负载不均衡等问题,不适用于实时推理的场景。针对以上问题,PAI提供了异步推理服务,用于支持类似的场景,用户可以在提交预测请求之后,通过轮询或是订阅的方式获取到推理服务的预测结果。\n",
"\n",
"在当前文档中,我们将介绍如何使用PAI Python SDK在PAI上部署和调用异步推理服务。"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"## 费用说明\n",
"\n",
"本示例将会使用以下云产品,并产生相应的费用账单:\n",
"\n",
"- PAI-EAS:部署推理服务,详细计费说明请参考[PAI-EAS计费说明](https://help.aliyun.com/zh/pai/product-overview/billing-of-eas)\n",
"- OSS:存储推理服务代码等,详细计费说明请参考[OSS计费概述](https://help.aliyun.com/zh/oss/product-overview/billing-overview)\n",
"\n",
"> 通过参与云产品免费试用,使用**指定资源机型**,可以免费试用PAI产品,具体请参考[PAI免费试用](https://help.aliyun.com/zh/pai/product-overview/free-quota-for-new-users)。\n",
"\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## 准备工作\n",
"\n",
"我们可以通过以下命令安装PAI Python SDK。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"\n",
"!python -m pip install --upgrade pai"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"SDK需要配置访问阿里云服务需要的 AccessKey,以及当前使用的工作空间和OSS Bucket。在PAI Python SDK安装之后,通过在 **命令行终端** 中执行以下命令,按照引导配置密钥,工作空间等信息。\n",
"\n",
"\n",
"```shell\n",
"\n",
"# 以下命令,请在 命令行终端 中执行.\n",
"\n",
"python -m pai.toolkit.config\n",
"\n",
"```\n",
"\n",
"我们可以通过以下代码验证当前的配置。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pai\n",
"from pai.session import get_default_session\n",
"\n",
"print(pai.__version__)\n",
"sess = get_default_session()\n",
"\n",
"assert sess.workspace_name is not None\n",
"print(sess.workspace_name)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## 部署异步推理服务模型\n",
"\n",
"将模型部署为异步推理服务与部署标准的在线推理服务类似,用户仅需在部署时(`Model.deploy`),传递`service_type=ServicType.Async`即可。\n",
"\n",
"当前流程中,我们将使用镜像部署的模式,部署一个异步的推理服务。\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"# 准备异步推理服务的应用代码目录\n",
"!mkdir -p serve_src/"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"通过`%%writefile`指令,我们将推理服务代码写入到`serve_src/run.py`文件中。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile serve_src/run.py\n",
"import asyncio\n",
"from random import random\n",
"\n",
"from fastapi import FastAPI, Request\n",
"import uvicorn, json, datetime\n",
"\n",
"# 默认模型加载路径\n",
"model_path = \"/eas/workspace/model/\"\n",
"\n",
"app = FastAPI()\n",
"\n",
"\n",
"@app.post(\"/\")\n",
"async def create_item(request: Request):\n",
" print(\"Make mock prediction starting ...\")\n",
" # Mock prediction\n",
" await asyncio.sleep(15)\n",
" print(\"Prediction finished.\")\n",
" return [random() for _ in range(10)]\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" uvicorn.run(app, host=\"0.0.0.0\", port=8000, workers=1)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"我们将使用PAI提供的PyTorch推理镜像部署以上的模型。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pai.model import Model, container_serving_spec\n",
"from pai.predictor import ServiceType\n",
"from pai.image import retrieve, ImageScope\n",
"\n",
"m = Model(\n",
" inference_spec=container_serving_spec(\n",
" source_dir=\"serve_src\",\n",
" command=\"python run.py\",\n",
" image_uri=retrieve(\n",
" \"PyTorch\",\n",
" framework_version=\"1.10\",\n",
" accelerator_type=\"gpu\",\n",
" image_scope=ImageScope.INFERENCE,\n",
" ),\n",
" requirements=[\n",
" \"fastapi\",\n",
" \"uvicorn\",\n",
" ],\n",
" )\n",
" # 用户可以通过`model_data`参数,传递一个OSS上的模型。相应的模型会被加载到推理服务的容器中。\n",
" # model_data=\"oss://<YourOssBucket>/path/to/model/\"\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"通过设置部署服务的`service_type=ServiceType.Async`参数,我们可以将模型部署为异步推理服务。异步推理服务使用分别使用输入队列(source)和输出队列(sink)保存预测请求和预测结果。通过`options`参数,可以配置队列使用的资源,队列最大长度,是否开启自动驱逐等高阶参数。异步服务支持的完整的高阶参数,请参考文档:[异步服务-参数配置](https://help.aliyun.com/document_detail/476812.html?#section-gor-gne-gtq)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pai.predictor import AsyncPredictor\n",
"from pai.common.utils import random_str\n",
"\n",
"\n",
"service_name = f\"async_service_example_{random_str(6)}\"\n",
"\n",
"p: AsyncPredictor = m.deploy(\n",
" service_name=service_name,\n",
" instance_type=\"ecs.c6.large\",\n",
" # 设置当前部署的服务类型为异步服务\n",
" service_type=ServiceType.Async,\n",
" # 用户可以通过options字段配置高阶参数\n",
" options={\n",
" # 异步推理详细参数文档: https://help.aliyun.com/document_detail/476812.html\n",
" \"queue.cpu\": 2, # 队列使用的CPU核数,默认为1\n",
" \"queue.memory\": 2048, # 异步服务使用过的队列内存,单位为MB\n",
" },\n",
")\n",
"\n",
"print()\n",
"\n",
"print(p)\n",
"print(p.service_name)\n",
"print(p.access_token)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## 调用推理服务\n",
"\n",
"用户发送调用异步队列服务与请求同步推理服务的方式相同,但是异步推理服务会立即返回本次预测请求的`RequestId`,而不是预测结果。用户可以通过轮询获取到推理服务的预测结果。\n",
"\n",
"- **用户客户端**发送推理请求,加入到推理服务的输入队列中,PAI-EAS返回请求的RequestId。\n",
"- PAI处理输入队列中的请求,转发给到**用户的推理服务**,推理服务处理完请求后,将结果写入到输出队列中\n",
"- **用户客户端**可以通过RequestId轮询,可以获取到**用户推理服务**的预测结果\n",
"\n",
"\n",
"PAI Python SDK提供了`AsyncPredictor`,支持用户更加简单得调用异步推理服务。"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 调用异步推理服务\n",
"\n",
"`AsyncPredictor`提供了`predict`和`raw_predict`方法发送预测请求,它们都会返回一个`AsyncTask`,用户可以通过`AsyncTask.result()`获取预测结果。 \n",
"\n",
"二者的区别在于`predict`方法会使用`Serializer`对象对输入数据进行序列化,对预测结果进行反序列化,而`raw_predict`方法直接将输入数据发送给异步推理服务,返回HTTP响应结果(`RawResponse`)。\n",
"\n",
"```python\n",
"\n",
"from pai.predictor import AsyncPredictor, AsyncTask\n",
"from pai.serializer import JsonSerializer\n",
"\n",
"p = AsyncPredictor(service_name='test_async_service', serializer=JsonSerializer())\n",
"\n",
"t1: AsyncTask = p.predict(data={\"some\": \"data\"})\n",
"# result是推理服务的响应结果(Response Body),经过Serialzier.deserialize处理后返回的结果.\n",
"result = t1.result()\n",
"\n",
"\n",
"t2: AsyncTask = p.raw_predict(data=b'{\"some\": \"data\"}')\n",
"resp: RawResponse = t2.result()\n",
"print(resp.status_code, resp.content)\n",
"\n",
"```\n",
"\n",
"`AsyncPredictor`会维护一个线程池,通过一个线程去发送推理请求,并等待请求处理完成。用户可以通过`max_workers`参数配置线程池的大小。\n",
"\n",
"```python\n",
"\n",
"p = AsyncPredictor(service_name='test_async_service', max_workers=20)\n",
"\n",
"```\n",
"\n",
"当用户需要在异步请求完成之后,对于响应的结果进行处理时,可以通过`callback`参数传递一个回调函数。回调函数的参数为`AsyncTask.result()`,也就实际响应的结果。\n",
"\n",
"\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"以下的示例代码中,我们将使用`AsyncPredictor`调用异步推理服务,并通过会回调函数处理预测结果。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pai.predictor import RawResponse, AsyncTask\n",
"import time\n",
"\n",
"# 结果列表\n",
"results = []\n",
"\n",
"\n",
"# 定义回调函数\n",
"def callback_fn(resp: RawResponse):\n",
" print(\"Callback: get prediction result \", resp.json())\n",
" results.append(resp.json())\n",
"\n",
"\n",
"# 发送预测请求,使用回调函数处理预测结果。\n",
"task: AsyncTask = p.raw_predict(\n",
" data=b\"test_data\",\n",
" callback=callback_fn,\n",
")\n",
"\n",
"# result() 方法等待预测完成\n",
"resp: RawResponse = task.result()\n",
"print(resp.json())\n",
"\n",
"# 等待回调函数执行完成\n",
"time.sleep(1)\n",
"\n",
"print(results)\n",
"assert len(results) == 1"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"以下示例中,我们批量发送异步推理请求,然后等待所有的请求完成。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tasks = []\n",
"\n",
"for i in range(10):\n",
" task: AsyncTask = p.raw_predict(\n",
" data=b\"test_data\",\n",
" callback=lambda x: print(\"Prediction result: \", x.json()),\n",
" )\n",
" tasks.append(task)\n",
"\n",
"prediction_results = [t.result().json() for t in tasks]\n",
"\n",
"print(prediction_results)\n",
"print(len(prediction_results))"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### 使用异步API调用推理服务\n",
"\n",
"`AsyncPredictor` 提供了异步API `raw_predict_async` 和 `predict_async`,支持用户使用Python提供的异步框架(asyncio)调用推理服务。\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pai.predictor import RawResponse\n",
"\n",
"# 使用异步API调用异步推理服务\n",
"res: RawResponse = await p.raw_predict_async(data=b\"test_data\")\n",
"\n",
"print(res.status_code)\n",
"print(res.content)\n",
"print(res.json())"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"通过SDK提供的异步API,我们可以不借助于线程池,批量发送异步预测请求。\n",
"\n",
"以下的示例中,我们将使用异步API,批量发送异步预测请求,等待推理完成,并使用回调函数打印预测请求结果。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"\n",
"\n",
"# 定义回调函数\n",
"def task_done_cb(task: asyncio.Task):\n",
" if task.exception():\n",
" raise task.exception()\n",
" else:\n",
" print(\"Prediction result: \", task.result().json())\n",
"\n",
"\n",
"# 使用异步API批量调用异步推理服务\n",
"async def batch_predict():\n",
" tasks = []\n",
" for _ in range(10):\n",
" task = asyncio.create_task(\n",
" # raw_predict_async 是一个coroutine\n",
" p.raw_predict_async(\n",
" data=b\"test_data\",\n",
" )\n",
" )\n",
" # 调用完成之后,打印调用返回结果\n",
" task.add_done_callback(task_done_cb)\n",
"\n",
" tasks.append(task)\n",
" # 等待所有任务完成\n",
" return await asyncio.gather(*tasks, return_exceptions=True)\n",
"\n",
"\n",
"batch_results = await batch_predict()\n",
"\n",
"\n",
"for result in batch_results:\n",
" print(result.json())"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"测试完成之后,可以使用`delete_service`方法删除对应服务,释放资源。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"p.delete_service()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "pai-dev-py38",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.19"
}
},
"nbformat": 4,
"nbformat_minor": 2
}