# NL2SQL SSE 流式响应解析器 ## 1. SSE 报文格式 ### 1.1 基本格式 SSE (Server-Sent Events) 格式如下: ``` event: data: ``` 注意:每个事件以 `\n\n` (两个换行符) 结尾。 ### 1.2 事件类型 | 事件类型 | 说明 | 触发时机 | |---------|------|---------| | `message` | 节点状态通知 | 节点开始执行或执行完成时 | | `dict` | 流式数据块 | LLM生成SQL的每个token片段 | | `error` | 错误信息 | 节点执行出错时 | | `end` | 流式结束 | 整个流程完成时 | --- ## 2. 各事件详细格式 ### 2.1 message 事件 - 节点状态通知 **节点开始执行:** ``` event: message data: {"node": "get_rec_dataset", "msg": "\n##开始执行##\n", "task": -1} ``` **节点执行完成:** ``` event: message data: {"node": "get_rec_dataset", "msg": "\n##执行完成##\n", "task": -1} ``` **data 字段说明:** | 字段 | 类型 | 说明 | |-----|------|------| | node | string | 节点名称 | | msg | string | 消息内容,包含节点状态标识 | | task | int | 任务ID(通常为 -1) | ### 2.2 dict 事件 - 流式数据块 **格式:** ``` event: dict data: {"node": "generate_process", "msg": "SELECT", "task": -1} ``` ``` event: dict data: {"node": "generate_process", "msg": " *", "task": -1} ``` **说明:** - 每个事件携带一个SQL片段 - 需要累积所有片段才能得到完整SQL - 通常来自 `generate_process` 节点 **data 字段说明:** | 字段 | 类型 | 说明 | |-----|------|------| | node | string | 节点名称(通常为 generate_process) | | msg | string | SQL片段(单个token或多个字符) | | task | int | 任务ID | ### 2.3 error 事件 - 错误信息 **格式:** ``` event: error data: {"node": "generate_process", "msg": "错误详情", "task": -1} ``` **data 字段说明:** | 字段 | 类型 | 说明 | |-----|------|------| | node | string | 出错的节点名称 | | msg | string | 错误消息 | | task | int | 任务ID | ### 2.4 end 事件 - 流式结束 **格式:** ``` event: end data: {"status": "done", "answer": "[Done]"} ``` **data 字段说明:** | 字段 | 类型 | 说明 | |-----|------|------| | status | string | 状态("done") | | answer | string | 结束标识("[Done]") | --- ## 3. 完整流程示例 以下是完整的 SSE 流式响应示例: ``` event: message data: {"node": "get_rec_dataset", "msg": "\n##开始执行##\n", "task": -1} event: message data: {"node": "get_rec_dataset", "msg": "\n##执行完成##\n", "task": -1} event: message data: {"node": "get_rec_dataset_info", "msg": "\n##开始执行##\n", "task": -1} event: message data: {"node": "get_rec_dataset_info", "msg": "\n##执行完成##\n", "task": -1} event: message data: {"node": "generate_process", "msg": "\n##开始执行##\n", "task": -1} event: dict data: {"node": "generate_process", "msg": "SELECT", "task": -1} event: dict data: {"node": "generate_process", "msg": " *", "task": -1} event: dict data: {"node": "generate_process", "msg": " FROM", "task": -1} event: dict data: {"node": "generate_process", "msg": " table_name", "task": -1} event: dict data: {"node": "generate_process", "msg": " WHERE", "task": -1} event: dict data: {"node": "generate_process", "msg": " condition", "task": -1} event: message data: {"node": "generate_process", "msg": "\n##执行完成##\n", "task": -1} event: end data: {"status": "done", "answer": "[Done]"} ``` --- ## 4. 节点执行顺序 根据 Pipeline 配置,节点按拓扑顺序执行: ``` get_rec_dataset (数据集推荐)        ↓ get_rec_dataset_info (数据集信息获取)        ↓ generate_process (SQL生成) ``` --- ## 5. 使用示例 ### 5.1 异步客户端(推荐) ```python import asyncio from client.nl2sql_client import NL2SQLClient async def main():     client = NL2SQLClient(base_url="http://localhost:8000")          # 方式1:获取完整结果     result = await client.generate_sql(         user_question="我想看看对公客户评级到期当日需推送的客户经理数量",         bbk="512",         domain="cmb_su",         token="your-token"     )     print(f"SQL: {result.sql}")     print(f"成功: {result.success}")          # 方式2:流式处理(实时显示)     async for event in client.generate_sql_stream(         user_question="...",         bbk="512",         domain="cmb_su",         token="your-token"     ):         if event.event.name == "DICT":             print(event.message, end="", flush=True)          await client.close() asyncio.run(main()) ``` ### 5.2 同步客户端 ```python from client.nl2sql_client import NL2SQLClientSync client = NL2SQLClientSync(base_url="http://localhost:8000") result = client.generate_sql(     user_question="我想看看对公客户评级到期当日需推送的客户经理数量",     bbk="512",     domain="cmb_su",     token="your-token" ) print(f"SQL: {result.sql}") ``` ### 5.3 仅使用解析器 ```python from client.sse_parser import SSEParser parser = SSEParser() # 设置回调 parser.on_node_start = lambda node: print(f"[开始] {node}") parser.on_node_complete = lambda node: print(f"[完成] {node}") parser.on_data_chunk = lambda node, chunk: print(chunk, end="") # 解析 SSE 行 parser.parse_line('event: message\\ndata: {"node": "get_rec_dataset", "msg": "##开始执行##"}\\n\\n') # 获取累积结果 sql = parser.get_node_result("generate_process") ``` --- ## 6. 响应头说明 | 响应头 | 值 | 说明 | |-------|---|------| | Content-Type | text/event-stream | SSE 流式响应 | | Cache-Control | no-cache | 禁止缓存 | | Connection | keep-alive | 保持连接 | | X-Request-ID | | 请求ID(用于追踪) | --- ## 7. 错误处理 ### 7.1 客户端断开连接 服务端会检测客户端断开,并终止流式响应。 ### 7.2 服务端错误 错误通过 `error` 事件返回: ``` event: error data: {"node": "generate_process", "msg": "错误详情", "task": -1} ``` ### 7.3 服务端繁忙(429) 当并发超过限制时,返回 HTTP 429: ``` HTTP/1.1 429 Too Many Requests {"detail": "服务器繁忙,请稍后再试"} ``` --- ## 8. 文件说明 | 文件 | 说明 | |-----|------| | `sse_parser.py` | SSE 解析器核心实现 | | `nl2sql_client.py` | NL2SQL 客户端封装 | | `README.md` | 本文档 |