stream_output_help.md 6.4 KB

NL2SQL SSE 流式响应解析器

1. SSE 报文格式

1.1 基本格式

SSE (Server-Sent Events) 格式如下:

event: <event_type>
data: <json_data>

注意:每个事件以 \n\n (两个换行符) 结尾。

1.2 事件类型

事件类型 说明 触发时机
message 节点状态通知 节点开始执行或执行完成时
dict 流式数据块 LLM生成SQL的每个token片段
error 错误信息 节点执行出错时
end 流式结束 整个流程完成时

2. 各事件详细格式

2.1 message 事件 - 节点状态通知

节点开始执行:

event: message
data: {"node": "get_rec_dataset", "msg": "\n##开始执行<get_rec_dataset>##\n", "task": -1}

节点执行完成:

event: message
data: {"node": "get_rec_dataset", "msg": "\n##执行完成<get_rec_dataset>##\n", "task": -1}

data 字段说明: | 字段 | 类型 | 说明 | |-----|------|------| | node | string | 节点名称 | | msg | string | 消息内容,包含节点状态标识 | | task | int | 任务ID(通常为 -1) |

2.2 dict 事件 - 流式数据块

格式:

event: dict
data: {"node": "generate_process", "msg": "SELECT", "task": -1}
event: dict
data: {"node": "generate_process", "msg": " *", "task": -1}

说明:

  • 每个事件携带一个SQL片段
  • 需要累积所有片段才能得到完整SQL
  • 通常来自 generate_process 节点 data 字段说明: | 字段 | 类型 | 说明 | |-----|------|------| | node | string | 节点名称(通常为 generate_process) | | msg | string | SQL片段(单个token或多个字符) | | task | int | 任务ID |

    2.3 error 事件 - 错误信息

    格式:

    event: error
    data: {"node": "generate_process", "msg": "错误详情", "task": -1}
    

    data 字段说明: | 字段 | 类型 | 说明 | |-----|------|------| | node | string | 出错的节点名称 | | msg | string | 错误消息 | | task | int | 任务ID |

    2.4 end 事件 - 流式结束

    格式:

    event: end
    data: {"status": "done", "answer": "[Done]"}
    

    data 字段说明: | 字段 | 类型 | 说明 | |-----|------|------| | status | string | 状态("done") |

    | answer | string | 结束标识("[Done]") |

    3. 完整流程示例

    以下是完整的 SSE 流式响应示例:

    event: message
    data: {"node": "get_rec_dataset", "msg": "\n##开始执行<get_rec_dataset>##\n", "task": -1}
    event: message
    data: {"node": "get_rec_dataset", "msg": "\n##执行完成<get_rec_dataset>##\n", "task": -1}
    event: message
    data: {"node": "get_rec_dataset_info", "msg": "\n##开始执行<get_rec_dataset_info>##\n", "task": -1}
    event: message
    data: {"node": "get_rec_dataset_info", "msg": "\n##执行完成<get_rec_dataset_info>##\n", "task": -1}
    event: message
    data: {"node": "generate_process", "msg": "\n##开始执行<generate_process>##\n", "task": -1}
    event: dict
    data: {"node": "generate_process", "msg": "SELECT", "task": -1}
    event: dict
    data: {"node": "generate_process", "msg": " *", "task": -1}
    event: dict
    data: {"node": "generate_process", "msg": " FROM", "task": -1}
    event: dict
    data: {"node": "generate_process", "msg": " table_name", "task": -1}
    event: dict
    data: {"node": "generate_process", "msg": " WHERE", "task": -1}
    event: dict
    data: {"node": "generate_process", "msg": " condition", "task": -1}
    event: message
    data: {"node": "generate_process", "msg": "\n##执行完成<generate_process>##\n", "task": -1}
    event: end
    data: {"status": "done", "answer": "[Done]"}
    

    4. 节点执行顺序

    根据 Pipeline 配置,节点按拓扑顺序执行:

    get_rec_dataset (数据集推荐)
           ↓
    get_rec_dataset_info (数据集信息获取)
           ↓
    generate_process (SQL生成)
    

    5. 使用示例

    5.1 异步客户端(推荐)

    import asyncio
    from client.nl2sql_client import NL2SQLClient
    async def main():
        client = NL2SQLClient(base_url="http://localhost:8000")
        
        # 方式1:获取完整结果
        result = await client.generate_sql(
            user_question="我想看看对公客户评级到期当日需推送的客户经理数量",
            bbk="512",
            domain="cmb_su",
            token="your-token"
        )
        print(f"SQL: {result.sql}")
        print(f"成功: {result.success}")
        
        # 方式2:流式处理(实时显示)
        async for event in client.generate_sql_stream(
            user_question="...",
            bbk="512",
            domain="cmb_su",
            token="your-token"
        ):
            if event.event.name == "DICT":
                print(event.message, end="", flush=True)
        
        await client.close()
    asyncio.run(main())
    

    5.2 同步客户端

    from client.nl2sql_client import NL2SQLClientSync
    client = NL2SQLClientSync(base_url="http://localhost:8000")
    result = client.generate_sql(
        user_question="我想看看对公客户评级到期当日需推送的客户经理数量",
        bbk="512",
        domain="cmb_su",
        token="your-token"
    )
    print(f"SQL: {result.sql}")
    

    5.3 仅使用解析器

    from client.sse_parser import SSEParser
    parser = SSEParser()
    # 设置回调
    parser.on_node_start = lambda node: print(f"[开始] {node}")
    parser.on_node_complete = lambda node: print(f"[完成] {node}")
    parser.on_data_chunk = lambda node, chunk: print(chunk, end="")
    # 解析 SSE 行
    parser.parse_line('event: message\\ndata: {"node": "get_rec_dataset", "msg": "##开始执行<get_rec_dataset>##"}\\n\\n')
    # 获取累积结果
    sql = parser.get_node_result("generate_process")
    

    6. 响应头说明


    7. 错误处理

    7.1 客户端断开连接

    服务端会检测客户端断开,并终止流式响应。

    7.2 服务端错误

    错误通过 error 事件返回:

    event: error
    data: {"node": "generate_process", "msg": "错误详情", "task": -1}
    

    7.3 服务端繁忙(429)

    当并发超过限制时,返回 HTTP 429:

    HTTP/1.1 429 Too Many Requests
    {"detail": "服务器繁忙,请稍后再试"}
    

    8. 文件说明

    响应头 说明
    Content-Type text/event-stream SSE 流式响应
    Cache-Control no-cache 禁止缓存
    Connection keep-alive 保持连接
    X-Request-ID 请求ID(用于追踪)
    文件 说明
    sse_parser.py SSE 解析器核心实现
    nl2sql_client.py NL2SQL 客户端封装
    README.md 本文档