-
Notifications
You must be signed in to change notification settings - Fork 251
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update Add new docs & Update docker builder shell
- Loading branch information
Showing
15 changed files
with
824 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# Python Pipeline 框架 | ||
|
||
在许多深度学习框架中,模型服务化部署通常用于单模型的一键部署。但在 AI 工业大生产的背景下,端到端的单一深度学习模型不能解决复杂问题,多个深度学习模型组合使用是解决现实复杂问题的常规手段,如文字识别 OCR 服务至少需要检测和识别2种模型;视频理解服务一般需要视频抽帧、切词、音频处理、分类等多种模型组合实现。当前,通用多模型组合服务的设计和实现是非常复杂的,既要能实现复杂的模型拓扑关系,又要保证服务的高并发、高可用和易于开发和维护等。 | ||
|
||
Paddle Serving 实现了一套通用的多模型组合服务编程框架 Python Pipeline,不仅解决上述痛点,同时还能大幅提高 GPU 利用率,并易于开发和维护。 | ||
|
||
通过阅读以下内容掌握 Python Pipeline 框架基础功能、设计方案、使用指南等。 | ||
- [Python Pipeline 基础功能]() | ||
- [Python Pipeline 使用案例]() | ||
- [Python Pipeline 高阶用法]() | ||
- [Python Pipeline 优化指南]() |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
# Python Pipeline 使用案例 | ||
|
||
Python Pipeline 使用案例部署步骤可分为下载模型、配置、编写代码、推理测试4个步骤。 | ||
|
||
所有Pipeline示例在[examples/Pipeline/](../../examples/Pipeline) 目录下,目前有7种类型模型示例: | ||
- [PaddleClas](../../examples/Pipeline/PaddleClas) | ||
- [Detection](../../examples/Pipeline/PaddleDetection) | ||
- [bert](../../examples/Pipeline/PaddleNLP/bert) | ||
- [imagenet](../../examples/Pipeline/PaddleClas/imagenet) | ||
- [imdb_model_ensemble](../../examples/Pipeline/imdb_model_ensemble) | ||
- [ocr](../../examples/Pipeline/PaddleOCR/ocr) | ||
- [simple_web_service](../../examples/Pipeline/simple_web_service) | ||
|
||
以 imdb_model_ensemble 为例来展示如何使用 Pipeline Serving,相关代码在 `Serving/examples/Pipeline/imdb_model_ensemble` 文件夹下可以找到,例子中的 Server 端结构如下图所示: | ||
|
||
<div align=center> | ||
<img src='../images/pipeline_serving-image4.png' height = "200" align="middle"/> | ||
</div> | ||
|
||
** 部署需要的文件 ** | ||
需要五类文件,其中模型文件、配置文件、服务端代码是构建Pipeline服务必备的三个文件。测试客户端和测试数据集为测试准备 | ||
- 模型文件 | ||
- 配置文件(config.yml) | ||
- 服务级别:服务端口、gRPC线程数、服务超时、重试次数等 | ||
- DAG级别:资源类型、开启Trace、性能profile | ||
- OP级别:模型路径、并发度、推理方式、计算硬件、推理超时、自动批量等 | ||
- 服务端(web_server.py) | ||
- 服务级别:定义服务名称、读取配置文件、启动服务 | ||
- DAG级别:指定多OP之间的拓扑关系 | ||
- OP级别:重写OP前后处理 | ||
- 测试客户端 | ||
- 正确性校验 | ||
- 压力测试 | ||
- 测试数据集 | ||
- 图片、文本、语音等 | ||
|
||
|
||
## 获取模型 | ||
|
||
示例中通过`get_data.sh`获取模型文件,示例中的模型文件已保存Feed/Fetch Var参数,如没有保存请跳转到[保存Serving部署参数]()步骤。 | ||
```shell | ||
cd Serving/examples/Pipeline/imdb_model_ensemble | ||
sh get_data.sh | ||
``` | ||
|
||
## 创建config.yaml | ||
本示例采用了brpc的client连接类型,还可以选择grpc或local_predictor。 | ||
```yaml | ||
#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1 | ||
rpc_port: 18070 | ||
|
||
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port | ||
http_port: 18071 | ||
|
||
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG | ||
#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num | ||
worker_num: 4 | ||
|
||
#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG | ||
build_dag_each_worker: False | ||
|
||
dag: | ||
#op资源类型, True, 为线程模型;False,为进程模型 | ||
is_thread_op: True | ||
|
||
#重试次数 | ||
retry: 1 | ||
|
||
#使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用 | ||
use_profile: False | ||
|
||
#channel的最大长度,默认为0 | ||
channel_size: 0 | ||
|
||
#tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据 | ||
tracer: | ||
#每次trace的时间间隔,单位秒/s | ||
interval_s: 10 | ||
op: | ||
bow: | ||
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发 | ||
concurrency: 1 | ||
|
||
# client连接类型,brpc, grpc和local_predictor | ||
client_type: brpc | ||
|
||
# Serving交互重试次数,默认不重试 | ||
retry: 1 | ||
|
||
# Serving交互超时时间, 单位ms | ||
timeout: 3000 | ||
|
||
# Serving IPs | ||
server_endpoints: ["127.0.0.1:9393"] | ||
|
||
# bow模型client端配置 | ||
client_config: "imdb_bow_client_conf/serving_client_conf.prototxt" | ||
|
||
# Fetch结果列表,以client_config中fetch_var的alias_name为准 | ||
fetch_list: ["prediction"] | ||
|
||
# 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞 | ||
batch_size: 2 | ||
|
||
# 批量查询超时,与batch_size配合使用 | ||
auto_batching_timeout: 2000 | ||
cnn: | ||
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发 | ||
concurrency: 1 | ||
|
||
# client连接类型,brpc | ||
client_type: brpc | ||
|
||
# Serving交互重试次数,默认不重试 | ||
retry: 1 | ||
|
||
# 预测超时时间, 单位ms | ||
timeout: 3000 | ||
|
||
# Serving IPs | ||
server_endpoints: ["127.0.0.1:9292"] | ||
|
||
# cnn模型client端配置 | ||
client_config: "imdb_cnn_client_conf/serving_client_conf.prototxt" | ||
|
||
# Fetch结果列表,以client_config中fetch_var的alias_name为准 | ||
fetch_list: ["prediction"] | ||
|
||
# 批量查询Serving的数量, 默认1。 | ||
batch_size: 2 | ||
|
||
# 批量查询超时,与batch_size配合使用 | ||
auto_batching_timeout: 2000 | ||
combine: | ||
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发 | ||
concurrency: 1 | ||
|
||
# Serving交互重试次数,默认不重试 | ||
retry: 1 | ||
|
||
# 预测超时时间, 单位ms | ||
timeout: 3000 | ||
|
||
# 批量查询Serving的数量, 默认1。 | ||
batch_size: 2 | ||
|
||
# 批量查询超时,与batch_size配合使用 | ||
auto_batching_timeout: 2000 | ||
``` | ||
## 编写 Server 代码 | ||
代码示例中,重点留意3个自定义Op的preprocess、postprocess处理,以及Combin Op初始化列表input_ops=[bow_op, cnn_op],设置Combin Op的前置OP列表。 | ||
```python | ||
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp | ||
from paddle_serving_server.pipeline import PipelineServer | ||
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 | ||
from paddle_serving_server.pipeline.channel import ChannelDataEcode | ||
import numpy as np | ||
from paddle_serving_app.reader import IMDBDataset | ||
|
||
class ImdbRequestOp(RequestOp): | ||
def init_op(self): | ||
self.imdb_dataset = IMDBDataset() | ||
self.imdb_dataset.load_resource('imdb.vocab') | ||
|
||
def unpack_request_package(self, request): | ||
dictdata = {} | ||
for idx, key in enumerate(request.key): | ||
if key != "words": | ||
continue | ||
words = request.value[idx] | ||
word_ids, _ = self.imdb_dataset.get_words_and_label(words) | ||
dictdata[key] = np.array(word_ids) | ||
return dictdata | ||
|
||
|
||
class CombineOp(Op): | ||
def preprocess(self, input_data): | ||
combined_prediction = 0 | ||
for op_name, data in input_data.items(): | ||
combined_prediction += data["prediction"] | ||
data = {"prediction": combined_prediction / 2} | ||
return data | ||
|
||
|
||
read_op = ImdbRequestOp() | ||
bow_op = Op(name="bow", | ||
input_ops=[read_op], | ||
server_endpoints=["127.0.0.1:9393"], | ||
fetch_list=["prediction"], | ||
client_config="imdb_bow_client_conf/serving_client_conf.prototxt", | ||
concurrency=1, | ||
timeout=-1, | ||
retry=1) | ||
cnn_op = Op(name="cnn", | ||
input_ops=[read_op], | ||
server_endpoints=["127.0.0.1:9292"], | ||
fetch_list=["prediction"], | ||
client_config="imdb_cnn_client_conf/serving_client_conf.prototxt", | ||
concurrency=1, | ||
timeout=-1, | ||
retry=1) | ||
combine_op = CombineOp( | ||
name="combine", | ||
input_ops=[bow_op, cnn_op], | ||
concurrency=5, | ||
timeout=-1, | ||
retry=1) | ||
|
||
# use default ResponseOp implementation | ||
response_op = ResponseOp(input_ops=[combine_op]) | ||
|
||
server = PipelineServer() | ||
server.set_response_op(response_op) | ||
server.prepare_server('config.yml') | ||
server.run_server() | ||
``` | ||
|
||
## 启动服务验证 | ||
|
||
```python | ||
from paddle_serving_client.pipeline import PipelineClient | ||
import numpy as np | ||
|
||
client = PipelineClient() | ||
client.connect(['127.0.0.1:18080']) | ||
|
||
words = 'i am very sad | 0' | ||
|
||
futures = [] | ||
for i in range(3): | ||
futures.append( | ||
client.predict( | ||
feed_dict={"words": words}, | ||
fetch=["prediction"], | ||
asyn=True)) | ||
|
||
for f in futures: | ||
res = f.result() | ||
if res["ecode"] != 0: | ||
print(res) | ||
exit(1) | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# Python Pipeline 高阶用法 | ||
|
||
高阶用法在复杂场景中使用,实现更多自定义能力,包括 DAG 跳过某个OP运行、自定义数据传输结构以及多卡推理等。 | ||
|
||
## DAG 跳过某个OP运行 | ||
|
||
为 DAG 图中跳过某个 OP 运行,实际做法是在跳过此 OP 的 process 阶段,只要在 preprocess 做好判断,跳过 process 阶段,在和 postprocess 后直接返回即可。 | ||
preprocess 返回结果列表的第二个结果是 `is_skip_process=True` 表示是否跳过当前 OP 的 process 阶段,直接进入 postprocess 处理。 | ||
|
||
```python | ||
def preprocess(self, input_dicts, data_id, log_id): | ||
""" | ||
In preprocess stage, assembling data for process stage. users can | ||
override this function for model feed features. | ||
Args: | ||
input_dicts: input data to be preprocessed | ||
data_id: inner unique id | ||
log_id: global unique id for RTT | ||
Return: | ||
input_dict: data for process stage | ||
is_skip_process: skip process stage or not, False default | ||
prod_errcode: None default, otherwise, product errores occured. | ||
It is handled in the same way as exception. | ||
prod_errinfo: "" default | ||
""" | ||
# multiple previous Op | ||
if len(input_dicts) != 1: | ||
_LOGGER.critical( | ||
self._log( | ||
"Failed to run preprocess: this Op has multiple previous " | ||
"inputs. Please override this func.")) | ||
os._exit(-1) | ||
(_, input_dict), = input_dicts.items() | ||
return input_dict, False, None, "" | ||
|
||
``` | ||
|
||
## 自定义 proto 中 Request 和 Response 结构 | ||
|
||
当默认 proto 结构不满足业务需求时,同时下面2个文件的 proto 的 Request 和 Response message 结构,保持一致。 | ||
|
||
> pipeline/gateway/proto/gateway.proto | ||
> pipeline/proto/pipeline_service.proto | ||
再重新编译 Serving Server。 | ||
|
||
|
||
## 自定义 URL | ||
grpc gateway 处理 post 请求,默认 `method` 是 `prediction`,例如:127.0.0.1:8080/ocr/prediction。用户可自定义 name 和 method,对于已有 url 的服务可无缝切换。 | ||
|
||
```proto | ||
service PipelineService { | ||
rpc inference(Request) returns (Response) { | ||
option (google.api.http) = { | ||
post : "/{name=*}/{method=*}" | ||
body : "*" | ||
}; | ||
} | ||
}; | ||
``` | ||
|
||
## 批量推理 | ||
Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率。Python Pipeline 支持3种 batch 形式以及适用的场景如下: | ||
- 场景1:一个推理请求包含批量数据(batch) | ||
- 单条数据定长,批量变长,数据转成BCHW格式 | ||
- 单条数据变长,前处理中将单条数据做 padding 转成定长 | ||
- 场景2:一个推理请求的批量数据拆分成多个小块推理(mini-batch) | ||
- 由于 padding 会按最长对齐,当一批数据中有个"极大"尺寸数据时会导致推理变慢 | ||
- 指定一个块大小,从而缩小"极大"尺寸数据的作用范围 | ||
- 场景3:合并多个请求数据批量推理(auto-batching) | ||
- 推理耗时明显长于前后处理,合并多个请求数据推理一次会提高吞吐和GPU利用率 | ||
- 要求多个请求数据的 shape 一致 | ||
|
||
| 接口 | 说明 | | ||
| :------------------------------------------: | :-----------------------------------------: | | ||
| batch | client 发送批量数据,client.predict 的 batch=True | | ||
| mini-batch | preprocess 按 list 类型返回,参考 OCR 示例 RecOp的preprocess| | ||
| auto-batching | config.yml 中 OP 级别设置 batch_size 和 auto_batching_timeout | | ||
|
||
|
||
### 4.6 单机多卡 | ||
单机多卡推理,M 个 OP 进程与 N 个 GPU 卡绑定,在 `config.yml` 中配置3个参数有关系,首先选择进程模式、并发数即进程数,devices 是 GPU 卡 ID。绑定方法是进程启动时遍历 GPU 卡 ID,例如启动7个 OP 进程 `config.yml` 设置 devices:0,1,2,那么第1,4,7个启动的进程与0卡绑定,第2,4个启动的进程与1卡绑定,3,6进程与卡2绑定。 | ||
- 进程ID: 0 绑定 GPU 卡0 | ||
- 进程ID: 1 绑定 GPU 卡1 | ||
- 进程ID: 2 绑定 GPU 卡2 | ||
- 进程ID: 3 绑定 GPU 卡0 | ||
- 进程ID: 4 绑定 GPU 卡1 | ||
- 进程ID: 5 绑定 GPU 卡2 | ||
- 进程ID: 6 绑定 GPU 卡0 | ||
|
||
`config.yml` 中硬件配置: | ||
``` | ||
#计算硬件 ID,当 devices 为""或不写时为 CPU 预测;当 devices 为"0", "0,1,2"时为 GPU 预测,表示使用的 GPU 卡 | ||
devices: "0,1,2" | ||
``` |
Oops, something went wrong.