pai-python-sdk/inference/async_inference/async_inference.ipynb (479 lines of code) (raw):

{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# 部署异步推理服务\n", "\n", "在复杂的模型推理场景中,例如AIGC、视频处理等场景中,模型服务推理耗时较长,存在长连接超时导致请求失败或实例负载不均衡等问题,不适用于实时推理的场景。针对以上问题,PAI提供了异步推理服务,用于支持类似的场景,用户可以在提交预测请求之后,通过轮询或是订阅的方式获取到推理服务的预测结果。\n", "\n", "在当前文档中,我们将介绍如何使用PAI Python SDK在PAI上部署和调用异步推理服务。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 费用说明\n", "\n", "本示例将会使用以下云产品,并产生相应的费用账单:\n", "\n", "- PAI-EAS:部署推理服务,详细计费说明请参考[PAI-EAS计费说明](https://help.aliyun.com/zh/pai/product-overview/billing-of-eas)\n", "- OSS:存储推理服务代码等,详细计费说明请参考[OSS计费概述](https://help.aliyun.com/zh/oss/product-overview/billing-overview)\n", "\n", "> 通过参与云产品免费试用,使用**指定资源机型**,可以免费试用PAI产品,具体请参考[PAI免费试用](https://help.aliyun.com/zh/pai/product-overview/free-quota-for-new-users)。\n", "\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## 准备工作\n", "\n", "我们可以通过以下命令安装PAI Python SDK。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "\n", "!python -m pip install --upgrade pai" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "\n", "SDK需要配置访问阿里云服务需要的 AccessKey,以及当前使用的工作空间和OSS Bucket。在PAI Python SDK安装之后,通过在 **命令行终端** 中执行以下命令,按照引导配置密钥,工作空间等信息。\n", "\n", "\n", "```shell\n", "\n", "# 以下命令,请在 命令行终端 中执行.\n", "\n", "python -m pai.toolkit.config\n", "\n", "```\n", "\n", "我们可以通过以下代码验证当前的配置。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pai\n", "from pai.session import get_default_session\n", "\n", "print(pai.__version__)\n", "sess = get_default_session()\n", "\n", "assert sess.workspace_name is not None\n", "print(sess.workspace_name)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## 部署异步推理服务模型\n", "\n", "将模型部署为异步推理服务与部署标准的在线推理服务类似,用户仅需在部署时(`Model.deploy`),传递`service_type=ServicType.Async`即可。\n", "\n", "当前流程中,我们将使用镜像部署的模式,部署一个异步的推理服务。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# 准备异步推理服务的应用代码目录\n", "!mkdir -p serve_src/" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "通过`%%writefile`指令,我们将推理服务代码写入到`serve_src/run.py`文件中。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile serve_src/run.py\n", "import asyncio\n", "from random import random\n", "\n", "from fastapi import FastAPI, Request\n", "import uvicorn, json, datetime\n", "\n", "# 默认模型加载路径\n", "model_path = \"/eas/workspace/model/\"\n", "\n", "app = FastAPI()\n", "\n", "\n", "@app.post(\"/\")\n", "async def create_item(request: Request):\n", " print(\"Make mock prediction starting ...\")\n", " # Mock prediction\n", " await asyncio.sleep(15)\n", " print(\"Prediction finished.\")\n", " return [random() for _ in range(10)]\n", "\n", "\n", "if __name__ == \"__main__\":\n", " uvicorn.run(app, host=\"0.0.0.0\", port=8000, workers=1)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "我们将使用PAI提供的PyTorch推理镜像部署以上的模型。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pai.model import Model, container_serving_spec\n", "from pai.predictor import ServiceType\n", "from pai.image import retrieve, ImageScope\n", "\n", "m = Model(\n", " inference_spec=container_serving_spec(\n", " source_dir=\"serve_src\",\n", " command=\"python run.py\",\n", " image_uri=retrieve(\n", " \"PyTorch\",\n", " framework_version=\"1.10\",\n", " accelerator_type=\"gpu\",\n", " image_scope=ImageScope.INFERENCE,\n", " ),\n", " requirements=[\n", " \"fastapi\",\n", " \"uvicorn\",\n", " ],\n", " )\n", " # 用户可以通过`model_data`参数,传递一个OSS上的模型。相应的模型会被加载到推理服务的容器中。\n", " # model_data=\"oss://<YourOssBucket>/path/to/model/\"\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "通过设置部署服务的`service_type=ServiceType.Async`参数,我们可以将模型部署为异步推理服务。异步推理服务使用分别使用输入队列(source)和输出队列(sink)保存预测请求和预测结果。通过`options`参数,可以配置队列使用的资源,队列最大长度,是否开启自动驱逐等高阶参数。异步服务支持的完整的高阶参数,请参考文档:[异步服务-参数配置](https://help.aliyun.com/document_detail/476812.html?#section-gor-gne-gtq)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pai.predictor import AsyncPredictor\n", "from pai.common.utils import random_str\n", "\n", "\n", "service_name = f\"async_service_example_{random_str(6)}\"\n", "\n", "p: AsyncPredictor = m.deploy(\n", " service_name=service_name,\n", " instance_type=\"ecs.c6.large\",\n", " # 设置当前部署的服务类型为异步服务\n", " service_type=ServiceType.Async,\n", " # 用户可以通过options字段配置高阶参数\n", " options={\n", " # 异步推理详细参数文档: https://help.aliyun.com/document_detail/476812.html\n", " \"queue.cpu\": 2, # 队列使用的CPU核数,默认为1\n", " \"queue.memory\": 2048, # 异步服务使用过的队列内存,单位为MB\n", " },\n", ")\n", "\n", "print()\n", "\n", "print(p)\n", "print(p.service_name)\n", "print(p.access_token)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## 调用推理服务\n", "\n", "用户发送调用异步队列服务与请求同步推理服务的方式相同,但是异步推理服务会立即返回本次预测请求的`RequestId`,而不是预测结果。用户可以通过轮询获取到推理服务的预测结果。\n", "\n", "- **用户客户端**发送推理请求,加入到推理服务的输入队列中,PAI-EAS返回请求的RequestId。\n", "- PAI处理输入队列中的请求,转发给到**用户的推理服务**,推理服务处理完请求后,将结果写入到输出队列中\n", "- **用户客户端**可以通过RequestId轮询,可以获取到**用户推理服务**的预测结果\n", "\n", "\n", "PAI Python SDK提供了`AsyncPredictor`,支持用户更加简单得调用异步推理服务。" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 调用异步推理服务\n", "\n", "`AsyncPredictor`提供了`predict`和`raw_predict`方法发送预测请求,它们都会返回一个`AsyncTask`,用户可以通过`AsyncTask.result()`获取预测结果。 \n", "\n", "二者的区别在于`predict`方法会使用`Serializer`对象对输入数据进行序列化,对预测结果进行反序列化,而`raw_predict`方法直接将输入数据发送给异步推理服务,返回HTTP响应结果(`RawResponse`)。\n", "\n", "```python\n", "\n", "from pai.predictor import AsyncPredictor, AsyncTask\n", "from pai.serializer import JsonSerializer\n", "\n", "p = AsyncPredictor(service_name='test_async_service', serializer=JsonSerializer())\n", "\n", "t1: AsyncTask = p.predict(data={\"some\": \"data\"})\n", "# result是推理服务的响应结果(Response Body),经过Serialzier.deserialize处理后返回的结果.\n", "result = t1.result()\n", "\n", "\n", "t2: AsyncTask = p.raw_predict(data=b'{\"some\": \"data\"}')\n", "resp: RawResponse = t2.result()\n", "print(resp.status_code, resp.content)\n", "\n", "```\n", "\n", "`AsyncPredictor`会维护一个线程池,通过一个线程去发送推理请求,并等待请求处理完成。用户可以通过`max_workers`参数配置线程池的大小。\n", "\n", "```python\n", "\n", "p = AsyncPredictor(service_name='test_async_service', max_workers=20)\n", "\n", "```\n", "\n", "当用户需要在异步请求完成之后,对于响应的结果进行处理时,可以通过`callback`参数传递一个回调函数。回调函数的参数为`AsyncTask.result()`,也就实际响应的结果。\n", "\n", "\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "以下的示例代码中,我们将使用`AsyncPredictor`调用异步推理服务,并通过会回调函数处理预测结果。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pai.predictor import RawResponse, AsyncTask\n", "import time\n", "\n", "# 结果列表\n", "results = []\n", "\n", "\n", "# 定义回调函数\n", "def callback_fn(resp: RawResponse):\n", " print(\"Callback: get prediction result \", resp.json())\n", " results.append(resp.json())\n", "\n", "\n", "# 发送预测请求,使用回调函数处理预测结果。\n", "task: AsyncTask = p.raw_predict(\n", " data=b\"test_data\",\n", " callback=callback_fn,\n", ")\n", "\n", "# result() 方法等待预测完成\n", "resp: RawResponse = task.result()\n", "print(resp.json())\n", "\n", "# 等待回调函数执行完成\n", "time.sleep(1)\n", "\n", "print(results)\n", "assert len(results) == 1" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "以下示例中,我们批量发送异步推理请求,然后等待所有的请求完成。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tasks = []\n", "\n", "for i in range(10):\n", " task: AsyncTask = p.raw_predict(\n", " data=b\"test_data\",\n", " callback=lambda x: print(\"Prediction result: \", x.json()),\n", " )\n", " tasks.append(task)\n", "\n", "prediction_results = [t.result().json() for t in tasks]\n", "\n", "print(prediction_results)\n", "print(len(prediction_results))" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### 使用异步API调用推理服务\n", "\n", "`AsyncPredictor` 提供了异步API `raw_predict_async` 和 `predict_async`,支持用户使用Python提供的异步框架(asyncio)调用推理服务。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pai.predictor import RawResponse\n", "\n", "# 使用异步API调用异步推理服务\n", "res: RawResponse = await p.raw_predict_async(data=b\"test_data\")\n", "\n", "print(res.status_code)\n", "print(res.content)\n", "print(res.json())" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "通过SDK提供的异步API,我们可以不借助于线程池,批量发送异步预测请求。\n", "\n", "以下的示例中,我们将使用异步API,批量发送异步预测请求,等待推理完成,并使用回调函数打印预测请求结果。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "\n", "\n", "# 定义回调函数\n", "def task_done_cb(task: asyncio.Task):\n", " if task.exception():\n", " raise task.exception()\n", " else:\n", " print(\"Prediction result: \", task.result().json())\n", "\n", "\n", "# 使用异步API批量调用异步推理服务\n", "async def batch_predict():\n", " tasks = []\n", " for _ in range(10):\n", " task = asyncio.create_task(\n", " # raw_predict_async 是一个coroutine\n", " p.raw_predict_async(\n", " data=b\"test_data\",\n", " )\n", " )\n", " # 调用完成之后,打印调用返回结果\n", " task.add_done_callback(task_done_cb)\n", "\n", " tasks.append(task)\n", " # 等待所有任务完成\n", " return await asyncio.gather(*tasks, return_exceptions=True)\n", "\n", "\n", "batch_results = await batch_predict()\n", "\n", "\n", "for result in batch_results:\n", " print(result.json())" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "测试完成之后,可以使用`delete_service`方法删除对应服务,释放资源。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "p.delete_service()" ] } ], "metadata": { "kernelspec": { "display_name": "pai-dev-py38", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.19" } }, "nbformat": 4, "nbformat_minor": 2 }