瀏覽代碼

first commit

ysl2007 5 月之前
父節點
當前提交
9982545be8
共有 41 個文件被更改,包括 3519 次插入0 次删除
  1. 3 0
      app/.env
  2. 16 0
      app/const.py
  3. 3 0
      app/data_access/__init__.py
  4. 100 0
      app/data_access/duckdb_client.py
  5. 39 0
      app/endpoints/generate_qa_pair.py
  6. 70 0
      app/endpoints/schema.py
  7. 6 0
      app/llm/__init__.py
  8. 142 0
      app/llm/llm_manager.py
  9. 56 0
      app/llm/parser_manager.py
  10. 61 0
      app/llm/parsers.py
  11. 38 0
      app/log/__init__.py
  12. 42 0
      app/log/prompts/generate_qa_pair/DB001/prompt.txt
  13. 30 0
      app/log/test_results/generate_qa_pair_response.txt
  14. 17 0
      app/main.py
  15. 4 0
      app/operators/__init__.py
  16. 309 0
      app/operators/generate_qa_pair.py
  17. 81 0
      app/operators/get_dashboard_info.py
  18. 3 0
      app/pipeline/__init__.py
  19. 73 0
      app/pipeline/pipeline_manager.py
  20. 75 0
      app/pipeline/workflow.py
  21. 114 0
      app/prompt_manager.py
  22. 8 0
      config/pipeline_settings.yaml
  23. 7 0
      config/prompt_template.yaml
  24. 152 0
      data/data_test.ipynb
  25. 二進制
      data/duckdb/bi_metadata.duckdb
  26. 179 0
      data/simulate_bi_metadata.ipynb
  27. 96 0
      docs/software_requirements.md
  28. 45 0
      field_similarity.py
  29. 7 0
      gdb_utils/__init__.py
  30. 333 0
      gdb_utils/opengauss_pool.py
  31. 438 0
      gdb_utils/opengauss_pool_hardened.py
  32. 20 0
      prompts/generate_qa_pair.txt
  33. 21 0
      prompts/generate_qa_pair_with_user_request.txt
  34. 17 0
      requirements.txt
  35. 52 0
      test_field_similarity.py
  36. 109 0
      tests/db_test.ipynb
  37. 50 0
      tests/test_generate_endpoint.py
  38. 37 0
      tests/test_generate_operator.py
  39. 340 0
      tests/test_opengauss_pool.py
  40. 298 0
      tests/test_opengauss_pool_hardened.py
  41. 28 0
      tests/test_workflow.py

+ 3 - 0
app/.env

@@ -0,0 +1,3 @@
+OPENAI_АРI_KЕY=Z0sYmаBTd8оЕHGхМХ@ТoK@8QуНМRSUWm
+OPENAI_BASE_URL=https://api.deepinfra.com/v1/openai
+LLM_MODEL=Qwen/Qwen3-Coder-480B-AB5B-Instruct-Turbo

+ 16 - 0
app/const.py

@@ -0,0 +1,16 @@
+"""Project-wide constants and default locations."""
+
+from __future__ import annotations
+
+from pathlib import Path
+
+
+APP_DIR = Path(__file__).resolve().parent
+ROOT_DIR = APP_DIR.parent
+CONFIG_DIR = ROOT_DIR / "config"
+PROMPT_DIR = ROOT_DIR / "prompts"
+DATA_DIR = ROOT_DIR / "data"
+LOG_ROOT = APP_DIR / "log"
+
+# Business constants
+MAX_INPUT_CARD_IDS = 3

+ 3 - 0
app/data_access/__init__.py

@@ -0,0 +1,3 @@
+from .duckdb_client import DuckDBClient
+
+__all__ = ["DuckDBClient"]

+ 100 - 0
app/data_access/duckdb_client.py

@@ -0,0 +1,100 @@
+"""Utility layer for querying BI metadata stored inside DuckDB."""
+
+from __future__ import annotations
+
+from pathlib import Path
+from threading import Lock
+from typing import Any, Dict, Iterable, List, Optional, Sequence
+
+import duckdb
+
+from app.const import DATA_DIR
+
+
+class DuckDBClient:
+    """Thread-safe helper around ``duckdb.DuckDBPyConnection``."""
+
+    def __init__(self, db_path: Optional[Path] = None) -> None:
+        default_path = DATA_DIR / "duckdb" / "bi_metadata.duckdb"
+        path = Path(db_path or default_path)
+        path.parent.mkdir(parents=True, exist_ok=True)
+        self._conn = duckdb.connect(str(path))
+        self._lock = Lock()
+
+    def _query(self, sql: str, params: Sequence[Any] | None = None) -> List[Dict[str, Any]]:
+        params = params or []
+        with self._lock:
+            cursor = self._conn.execute(sql, params)
+            rows = cursor.fetchall()
+            columns = [desc[0] for desc in cursor.description]
+        return [dict(zip(columns, row)) for row in rows]
+
+    def fetch_dashboard_cards(
+        self,
+        dashboard_id: str,
+        card_ids: Optional[Iterable[str]] = None,
+        bbk_id: Optional[str] = None,
+    ) -> List[Dict[str, Any]]:
+        """Return card metadata for the requested dashboard and optional card list."""
+
+        sql = [
+            "SELECT card_id, dashboard_id, dataset_id, dashboard_name, card_name, dataset_name, bbk_id",
+            "FROM card_dataset_dashboard_mapping",
+            "WHERE dashboard_id = ?",
+        ]
+        params: List[Any] = [dashboard_id]
+        if bbk_id:
+            sql.append("AND bbk_id = ?")
+            params.append(bbk_id)
+        if card_ids:
+            placeholders = ",".join(["?"] * len(list(card_ids)))
+            sql.append(f"AND card_id IN ({placeholders})")
+            params.extend(list(card_ids))
+        statement = " ".join(sql)
+        return self._query(statement, params)
+
+    def fetch_card_definition(self, card_ids: Iterable[str]) -> List[Dict[str, Any]]:
+        """Retrieve SELECT/WHERE/GROUP BY definitions for the provided cards."""
+
+        card_ids = list(card_ids)
+        if not card_ids:
+            return []
+        placeholders = ",".join(["?"] * len(card_ids))
+        statement = (
+            "SELECT card_id, card_name, card_desc, sql_select, sql_where, sql_groupby "
+            "FROM card_info WHERE card_id IN (" + placeholders + ")"
+        )
+        return self._query(statement, list(card_ids))
+
+    def fetch_card_filters(self, card_ids: Iterable[str]) -> List[Dict[str, Any]]:
+        card_ids = list(card_ids)
+        if not card_ids:
+            return []
+        placeholders = ",".join(["?"] * len(card_ids))
+        statement = (
+            "SELECT card_id, filter_id, filter_type, where_clause, default_value, options "
+            "FROM card_filter_info WHERE card_id IN (" + placeholders + ")"
+        )
+        return self._query(statement, list(card_ids))
+
+    def fetch_dashboard_info(self, dashboard_id: str) -> Optional[Dict[str, Any]]:
+        rows = self._query(
+            "SELECT dashboard_id, dashboard_name, dashboard_desc, folder_path FROM dashboard_info WHERE dashboard_id = ?",
+            [dashboard_id],
+        )
+        return rows[0] if rows else None
+
+    def fetch_dataset_ddl(self, dataset_ids: Iterable[str]) -> List[Dict[str, Any]]:
+        dataset_ids = list(dataset_ids)
+        if not dataset_ids:
+            return []
+        placeholders = ",".join(["?"] * len(dataset_ids))
+        statement = (
+            "SELECT dataset_id, dataset_ddl FROM dataset_ddl WHERE dataset_id IN ("
+            + placeholders
+            + ")"
+        )
+        return self._query(statement, list(dataset_ids))
+
+
+__all__ = ["DuckDBClient"]

+ 39 - 0
app/endpoints/generate_qa_pair.py

@@ -0,0 +1,39 @@
+
+from fastapi import APIRouter, HTTPException
+
+from app.const import MAX_INPUT_CARD_IDS
+from app.endpoints.schema import QARequest, QAResponse, QAResponseBody
+from app.pipeline.workflow import Workflow
+
+
+router = APIRouter()
+
+
+@router.post("/generate_qa_pair", response_model=QAResponse)
+async def generate_qa_pair(request_body: QARequest):
+    if len(request_body.card_ids) > MAX_INPUT_CARD_IDS:
+        raise HTTPException(
+            status_code=400,
+            detail=f"Number of card IDs exceeds the maximum limit of {MAX_INPUT_CARD_IDS}.",
+        )
+    try:
+        workflow = Workflow()
+        input_args = {
+            "dashboard_id": request_body.dashboard_id,
+            "card_ids": request_body.card_ids,
+            "bbk": request_body.bbk_id,
+            "user_request": request_body.user_request or "",
+        }
+        result = await workflow.execute_workflow(input_args)
+        qa_response = QAResponse(
+            returnCode="SUCCESS",
+            body=QAResponseBody(
+                dashboard_id=request_body.dashboard_id,
+                card_ids=request_body.card_ids,
+                qa_pairs=result,
+                bbk_id=request_body.bbk_id,
+            ),
+        )
+        return qa_response
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

+ 70 - 0
app/endpoints/schema.py

@@ -0,0 +1,70 @@
+from typing import Any, Dict, List, Optional
+
+from pydantic import BaseModel, Field
+
+
+class CardFilter(BaseModel):
+    """Metadata describing a filter bound to a BI card."""
+
+    card_id: str
+    filter_id: str
+    type: str
+    where_clause: str
+    default_value: Optional[str] = None
+    options: List[str] = Field(default_factory=list)
+
+
+class CardInfo(BaseModel):
+    card_id: str
+    card_name: str
+    card_desc: Optional[str] = None
+    dataset_id: str
+    sql_select: str
+    sql_where: Optional[Dict[str, Any]] = None
+    sql_groupby: Optional[str] = None
+    filters: List[CardFilter] = Field(default_factory=list)
+
+
+class DashboardInfo(BaseModel):
+    dashboard_id: str
+    dashboard_name: str
+    dashboard_desc: Optional[str] = None
+    folder_path: Optional[str] = None
+    cards: List[CardInfo] = Field(default_factory=list)
+
+
+class QARequest(BaseModel):
+    request_id: str
+    dashboard_id: str
+    card_ids: List[str]
+    bbk_id: str
+    user_token: Optional[str] = None
+    user_request: Optional[str] = None
+
+
+class QAPair(BaseModel):
+    dashboard_id: str
+    dashboard_name: str
+    dashboard_desc: Optional[str]
+    card_id: str
+    card_name: str
+    card_desc: Optional[str]
+    dataset_id: str
+    question: str
+    answer: str = ""
+    question_with_slot: str = ""
+    filter_ids: List[str] = Field(default_factory=list)
+    filters: List[str] = Field(default_factory=list)
+    defs: List[str] = Field(default_factory=list)
+
+
+class QAResponseBody(BaseModel):
+    dashboard_id: str
+    bbk_id: str
+    card_ids: List[str]
+    qa_pairs: List[QAPair]
+
+
+class QAResponse(BaseModel):
+    returnCode: str = "SUCCESS"
+    body: QAResponseBody

+ 6 - 0
app/llm/__init__.py

@@ -0,0 +1,6 @@
+"""LLM utilities exposed as package level imports."""
+
+from .llm_manager import LLMManager, LLMResponse
+from .parser_manager import ParserManager
+
+__all__ = ["LLMManager", "LLMResponse", "ParserManager"]

+ 142 - 0
app/llm/llm_manager.py

@@ -0,0 +1,142 @@
+"""Management utilities for heterogeneous LLM providers."""
+
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass
+from pathlib import Path
+from threading import Lock
+from typing import Any, Dict, Optional, Type
+
+import yaml
+
+from app.const import CONFIG_DIR
+from .parser_manager import ParserManager
+from .parsers import BaseParser, ParsedResult, TextParser
+
+
+@dataclass
+class LLMResponse:
+    content: str
+    raw: Dict[str, Any]
+
+
+class BaseLLMClient:
+    """Base class that concrete providers inherit from."""
+
+    def __init__(self, name: str, parser: Optional[BaseParser] = None, **config: Any) -> None:
+        self.name = name
+        self.config = config
+        self.parser = parser or TextParser()
+
+    async def ainvoke(self, prompt: str, **kwargs: Any) -> LLMResponse:  # pragma: no cover - abstract
+        raise NotImplementedError
+
+    def _wrap(self, parsed: ParsedResult) -> LLMResponse:
+        content = parsed.data
+        if not isinstance(content, str):
+            content = json.dumps(content, ensure_ascii=False)
+        return LLMResponse(content=content, raw={"metadata": parsed.metadata})
+
+
+class EchoLLMClient(BaseLLMClient):
+    """Fallback client that simply echoes prompts."""
+
+    async def ainvoke(self, prompt: str, **_: Any) -> LLMResponse:
+        parsed = self.parser.parse(prompt)
+        return self._wrap(parsed)
+
+
+class HttpLLMClient(BaseLLMClient):
+    """HTTP JSON based LLM provider."""
+
+    async def ainvoke(self, prompt: str, **kwargs: Any) -> LLMResponse:
+        if httpx is None:
+            raise ImportError("httpx is required for HttpLLMClient")
+        endpoint = self.config["endpoint"]
+        method = self.config.get("method", "POST").upper()
+        timeout = self.config.get("timeout", 60)
+        payload = dict(self.config.get("payload", {}))
+        payload_field = self.config.get("prompt_field", "prompt")
+        payload[payload_field] = prompt
+        payload.update(kwargs.get("extra_payload", {}))
+        headers = self.config.get("headers", {})
+
+        async with httpx.AsyncClient(timeout=timeout) as client:
+            response = await client.request(method, endpoint, json=payload, headers=headers)
+        response.raise_for_status()
+        response_json = response.json()
+        content_path = self.config.get("response_path", [])
+        content: Any = response_json
+        for key in content_path:
+            if isinstance(content, dict):
+                content = content[key]
+            else:
+                raise KeyError(f"Cannot resolve response path segment {key}")
+        if not isinstance(content, str):
+            content = json.dumps(content, ensure_ascii=False)
+        return LLMResponse(content=content, raw=response_json)
+
+
+LLM_CLIENTS: Dict[str, Type[BaseLLMClient]] = {
+    "echo": EchoLLMClient,
+    "http": HttpLLMClient,
+}
+
+
+class LLMManager:
+    """Singleton responsible for instantiating configured LLM clients."""
+
+    _instance: Optional["LLMManager"] = None
+    _lock = Lock()
+
+    def __new__(cls, config_path: Optional[Path] = None) -> "LLMManager":
+        with cls._lock:
+            if cls._instance is None:
+                cls._instance = super().__new__(cls)
+                cls._instance._build(config_path)
+        return cls._instance
+
+    def _build(self, config_path: Optional[Path]) -> None:
+        path = Path(config_path or (CONFIG_DIR / "llm_settings.yaml"))
+        self._models: Dict[str, BaseLLMClient] = {}
+        self._model_config: Dict[str, Dict[str, Any]] = {}
+        self._default_model = "echo"
+        if path.exists():
+            with path.open("r", encoding="utf-8") as fh:
+                config = yaml.safe_load(fh) or {}
+            self._default_model = config.get("default_model", "echo")
+            self._model_config = config.get("models", {})
+        else:
+            self._model_config = {"echo": {"provider": "echo", "parser": "text"}}
+
+    def _resolve_parser(self, parser_name: Optional[str]) -> BaseParser:
+        if not parser_name:
+            return TextParser()
+        try:
+            return ParserManager().get_parser(parser_name)
+        except KeyError:
+            return TextParser()
+
+    def _create_model(self, model_name: str) -> BaseLLMClient:
+        config = self._model_config.get(model_name)
+        if not config:
+            raise KeyError(f"Model {model_name} not defined in llm_settings.yaml")
+        provider = config.get("provider", "echo")
+        parser_name = config.get("parser")
+        parser = self._resolve_parser(parser_name)
+        options = config.get("options", {})
+        client_cls = LLM_CLIENTS.get(provider)
+        if not client_cls:
+            raise ValueError(f"Unsupported LLM provider: {provider}")
+        return client_cls(model_name, parser=parser, **options)
+
+    def get_llm_model(self, model_name: Optional[str] = None) -> BaseLLMClient:
+        target = model_name or self._default_model
+        if target not in self._models:
+            self._models[target] = self._create_model(target)
+        return self._models[target]
+try:
+    import httpx
+except ImportError:  # pragma: no cover - optional dependency
+    httpx = None

+ 56 - 0
app/llm/parser_manager.py

@@ -0,0 +1,56 @@
+"""Registry for converting LLM responses via pluggable parsers."""
+
+from __future__ import annotations
+
+from pathlib import Path
+from threading import Lock
+from typing import Dict, Optional, Type
+
+import yaml
+
+from app.const import CONFIG_DIR
+from .parsers import BaseParser, PARSER_REGISTRY
+
+
+class ParserManager:
+    """Lazy singleton keeping parser instances referenced by name."""
+
+    _instance: Optional["ParserManager"] = None
+    _lock = Lock()
+
+    def __new__(cls, config_path: Optional[Path] = None) -> "ParserManager":
+        with cls._lock:
+            if cls._instance is None:
+                cls._instance = super().__new__(cls)
+                cls._instance._build(config_path)
+        return cls._instance
+
+    def _build(self, config_path: Optional[Path]) -> None:
+        path = Path(config_path or (CONFIG_DIR / "parser_settings.yaml"))
+        self._parsers: Dict[str, BaseParser] = {}
+
+        self._register_defaults()
+        if path.exists():
+            self._load_custom_parsers(path)
+
+    def _register_defaults(self) -> None:
+        for name, parser_cls in PARSER_REGISTRY.items():
+            self._parsers[name] = parser_cls()
+
+    def _load_custom_parsers(self, path: Path) -> None:
+        with path.open("r", encoding="utf-8") as fh:
+            parser_settings = yaml.safe_load(fh) or {}
+        for name, config in parser_settings.items():
+            parser_type = config.get("type")
+            if not parser_type or parser_type not in PARSER_REGISTRY:
+                continue
+            parser_cls: Type[BaseParser] = PARSER_REGISTRY[parser_type]
+            self._parsers[name] = parser_cls(**config.get("options", {}))
+
+    def get_parser(self, name: str) -> BaseParser:
+        if name not in self._parsers:
+            raise KeyError(f"Parser {name} is not registered")
+        return self._parsers[name]
+
+    def register_parser(self, name: str, parser: BaseParser) -> None:
+        self._parsers[name] = parser

+ 61 - 0
app/llm/parsers.py

@@ -0,0 +1,61 @@
+"""Parsers used to convert raw LLM responses into structured payloads."""
+
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass
+from typing import Any, Dict, Optional
+
+
+@dataclass
+class ParsedResult:
+    """Normalized structure returned by every parser."""
+
+    data: Any
+    metadata: Dict[str, Any]
+
+
+class BaseParser:
+    """Base class for all parsers."""
+
+    name: str = "base"
+
+    def parse(self, text: str, **_: Any) -> ParsedResult:  # pragma: no cover - abstract
+        raise NotImplementedError
+
+
+class JsonParser(BaseParser):
+    """Parser that tries to coerce the response into JSON."""
+
+    name = "json"
+
+    def __init__(self, root_key: Optional[str] = None) -> None:
+        self.root_key = root_key
+
+    def parse(self, text: str, **_: Any) -> ParsedResult:
+        content = text.strip()
+        if content.startswith("```"):
+            lines = content.splitlines()[1:-1]
+            content = "\n".join(lines)
+        if content.startswith("json"):
+            content = content[4:].lstrip()
+
+        payload = json.loads(content or "{}")
+        if self.root_key and isinstance(payload, dict):
+            payload = payload.get(self.root_key, payload)
+        return ParsedResult(data=payload, metadata={"parser": self.name})
+
+
+class TextParser(BaseParser):
+    """Return the verbatim text result."""
+
+    name = "text"
+
+    def parse(self, text: str, **_: Any) -> ParsedResult:
+        return ParsedResult(data=text.strip(), metadata={"parser": self.name})
+
+
+PARSER_REGISTRY = {
+    JsonParser.name: JsonParser,
+    TextParser.name: TextParser,
+}

+ 38 - 0
app/log/__init__.py

@@ -0,0 +1,38 @@
+"""Project level logger helper."""
+
+from __future__ import annotations
+
+import logging
+from logging.handlers import RotatingFileHandler
+from pathlib import Path
+from typing import Dict
+
+from app.const import LOG_ROOT
+
+
+class _Logger:
+    def __init__(self) -> None:
+        LOG_ROOT.mkdir(parents=True, exist_ok=True)
+        log_path = LOG_ROOT / "qa_pair.log"
+        self._logger = logging.getLogger("qa_pair")
+        if not self._logger.handlers:
+            self._logger.setLevel(logging.INFO)
+            handler = RotatingFileHandler(log_path, maxBytes=5_000_000, backupCount=2, encoding="utf-8")
+            formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
+            handler.setFormatter(formatter)
+            self._logger.addHandler(handler)
+        self._level_map: Dict[str, int] = {
+            "DEBUG": logging.DEBUG,
+            "INFO": logging.INFO,
+            "WARN": logging.WARNING,
+            "ERROR": logging.ERROR,
+        }
+
+    def log(self, message: str, level: str = "INFO") -> None:
+        lvl = self._level_map.get(level.upper(), logging.INFO)
+        self._logger.log(lvl, message)
+
+
+logger = _Logger()
+
+__all__ = ["logger"]

+ 42 - 0
app/log/prompts/generate_qa_pair/DB001/prompt.txt

@@ -0,0 +1,42 @@
+你是一名 BI 系统的问答构建助手,负责为每张卡片生成可落地执行的 SQL 及对应的用户问题。
+
+输入提供了仪表盘与卡片定义,包括:
+- 每张卡片的 SELECT/GROUP BY 语句(未带 WHERE/HAVING)
+- F 类型筛选器:用户可调整的业务过滤条件
+- D 类型筛选器:卡片的固化条件,必须始终应用
+
+任务要求:
+1. 仅针对 F 类型筛选器,挑选 1~3 个最有业务意义的组合(可以是 1 个或多个)。
+2. 将所选 F 筛选器与所有 D 类型固化条件一起拼接进 SQL 的 WHERE/HAVING 中,生成可直接执行的 SQL。
+3. 为每个生成的 SQL 写出清晰的用户问题与其带槽位的问题(sample_question 可包含占位符)。
+4. 严格输出 JSON 数组,每个元素字段:
+   - card_id
+   - question
+   - sample_question
+   - filter_ids: 所选的 F 类型筛选器 ID 列表(D 类型无需列出,会自动附加)
+   - sql: 拼接后的完整 SQL 文本
+
+输入内容:
+=======================================================================================
+仪表盘ID: DB001
+仪表盘名称: 运营仪表板01
+仪表盘描述: 机构画像概览
+文件夹全路径: /bbk/db001/folder_01
+
+---------------------------------------------------------------------------------------
+卡片(ID: C0001):
+卡片名称: 卡片01
+卡片描述: 卡片01 指标解析
+数据集ID: DS0001
+SELECT: SELECT channel, SUM(metric_value) AS metric_value FROM DS0001
+GROUP BY: GROUP BY channel
+
+卡片的过滤器条件:
+filter_id: PROVINCE_01
+条件语句: province_name = ?
+默认值: ALL
+选项: ['Province 1', 'Province 2', 'Province 3', 'Province 4']
+
+卡片的固化条件(D类型)
+filter_id: DATE_01
+条件语句: stat_date >= date '2023-01-01'

+ 30 - 0
app/log/test_results/generate_qa_pair_response.txt

@@ -0,0 +1,30 @@
+{
+  "returnCode": "SUCCESS",
+  "body": {
+    "dashboard_id": "DB001",
+    "bbk_id": "BBK002",
+    "card_ids": [
+      "C0001",
+      "C0002"
+    ],
+    "qa_pairs": [
+      {
+        "dashboard_id": "DB001",
+        "dashboard_name": "运营仪表板01",
+        "dashboard_desc": "机构画像概览",
+        "card_id": "C0001",
+        "card_name": "卡片01",
+        "card_desc": "卡片01 指标解析",
+        "dataset_id": "DS0001",
+        "question": "卡片01 的指标是多少?",
+        "answer": "SELECT channel, SUM(metric_value) AS metric_value FROM DS0001 WHERE stat_date >= date '2023-01-01' GROUP BY channel",
+        "question_with_slot": "请给出 卡片01 的最新指标",
+        "filter_ids": [
+          "DATE_01"
+        ],
+        "filters": [],
+        "defs": []
+      }
+    ]
+  }
+}

+ 17 - 0
app/main.py

@@ -0,0 +1,17 @@
+"""FastAPI application entry point."""
+
+from __future__ import annotations
+
+from fastapi import FastAPI
+
+from app.endpoints.generate_qa_pair import router as qa_router
+
+
+def create_app() -> FastAPI:
+    app = FastAPI(title="QA Pair Service", version="1.0.0")
+    app.include_router(qa_router, prefix="/api")
+    return app
+
+
+app = create_app()
+

+ 4 - 0
app/operators/__init__.py

@@ -0,0 +1,4 @@
+from .generate_qa_pair import generate_qa_pair
+from .get_dashboard_info import get_dashboard_info
+
+__all__ = ["generate_qa_pair", "get_dashboard_info"]

+ 309 - 0
app/operators/generate_qa_pair.py

@@ -0,0 +1,309 @@
+"""Operator entry point for generating QA pairs from dashboard metadata."""
+
+from __future__ import annotations
+
+import inspect
+import json
+import os
+from typing import Any, Dict, List, Tuple
+
+from app.const import LOG_ROOT
+from app.endpoints.schema import CardInfo, DashboardInfo, QAPair
+from app.llm import LLMManager
+from app.log import logger
+from app.pipeline import PipelineManager
+from app.prompt_manager import PromptManager
+
+
+def validate_input(input_args: Dict[str, Any]) -> str:
+    """Validate BBK info in ``input_args`` and return the optional user request."""
+
+    bbk = input_args["bbk"]
+    return input_args.get("user_request", "")
+
+
+def build_condition_dict(card: CardInfo) -> Dict[str, Any]:
+    """Build a searchable condition dictionary for the given card."""
+
+    condition_dict: Dict[str, Any] = {}
+
+    for flt in card.filters:
+        key = flt.filter_id.replace("'", "")
+        condition_dict[key] = {
+            "type": flt.type,
+            "条件语句": flt.where_clause,
+            "聚合条件": False,
+            "默认值": flt.default_value,
+            "选项": flt.options,
+        }
+
+    sql_where = card.sql_where or {}
+    for raw_key, raw_value in sql_where.items():
+        key = raw_key.replace("'", "")
+        if key not in condition_dict:
+            condition_dict[key] = {
+                "type": raw_value.get("type", "F"),
+                "条件语句": raw_value.get("exp", ""),
+                "聚合条件": raw_value.get("agg", False),
+                "默认值": raw_value.get("default", ""),
+                "选项": raw_value.get("options", []),
+            }
+
+    return condition_dict
+
+
+def build_card_content(card: CardInfo, condition_dict: Dict[str, Any]) -> List[str]:
+    """Build prompt content for a single card."""
+
+    content_lines = [
+        "---------------------------------------------------------------------------------------",
+        f"卡片(ID: {card.card_id}):",
+        f"卡片名称: {card.card_name}",
+        f"卡片描述: {card.card_desc}",
+        f"数据集ID: {card.dataset_id}",
+        f"SELECT: {card.sql_select}",
+    ]
+    if card.sql_groupby:
+        content_lines.append(f"GROUP BY: {card.sql_groupby}")
+
+    content_lines.append("\n卡片的过滤器条件:")
+    for key, value in condition_dict.items():
+        if value["type"] == "F":
+            content_lines.extend([
+                f"filter_id: {key}",
+                f"条件语句: {value['条件语句']}",
+            ])
+            if value["聚合条件"]:
+                content_lines.append(f"是否聚合: {value['聚合条件']}")
+            if value["默认值"]:
+                content_lines.append(f"默认值: {value['默认值']}")
+            if value["选项"]:
+                content_lines.append(f"选项: {value['选项']}")
+    content_lines.append("\n卡片的固化条件(D类型)")
+    for key, value in condition_dict.items():
+        if value["type"] == "D":
+            content_lines.extend([
+                f"filter_id: {key}",
+                f"条件语句: {value['条件语句']}",
+            ])
+
+    return content_lines
+
+
+def build_dashboard_content(dashboard_info: DashboardInfo) -> Tuple[str, Dict[str, Any]]:
+    """Aggregate the prompt text for the whole dashboard."""
+
+    content_lines = [
+        "=======================================================================================",
+        f"仪表盘ID: {dashboard_info.dashboard_id}",
+        f"仪表盘名称: {dashboard_info.dashboard_name}",
+        f"仪表盘描述: {dashboard_info.dashboard_desc}",
+        f"文件夹全路径: {dashboard_info.folder_path or '无'}\n",
+    ]
+
+    card_id_2_filters: Dict[str, Any] = {}
+    for card in dashboard_info.cards:
+        logger.log(
+            f"Processing card ID: {card.card_id} in dashboard ID: {dashboard_info.dashboard_id}",
+            level="DEBUG",
+        )
+        condition_dict = build_condition_dict(card)
+        card_id_2_filters[card.card_id] = condition_dict
+        content_lines.extend(build_card_content(card, condition_dict))
+
+    return "\n".join(content_lines), card_id_2_filters
+
+
+async def generate_prompt_content(
+    prompt_manager: PromptManager,
+    content: str,
+    user_request: str,
+):
+    """Render the prompt template with optional user request."""
+
+    context = {"content": content}
+    if user_request:
+        context["user_request"] = user_request
+        template = prompt_manager.get_prompt_template("generate_qa_pair_with_user_request")
+    else:
+        template = prompt_manager.get_prompt_template("generate_qa_pair")
+    return await template.ainvoke(context)
+
+
+def save_prompt_log(dashboard_id: str, prompt_text: str) -> None:
+    """Persist the generated prompt for debugging and auditing."""
+
+    log_dir = os.path.join(LOG_ROOT, "prompts", "generate_qa_pair", dashboard_id)
+    os.makedirs(log_dir, exist_ok=True)
+    log_file = os.path.join(log_dir, "prompt.txt")
+    with open(log_file, "w", encoding="utf-8") as file:
+        file.write(prompt_text)
+    logger.log(f"Prompt saved to {log_file}", level="INFO")
+
+
+def build_where_clauses(
+    condition_dict: Dict[str, Any],
+    filter_ids: List[str],
+) -> Tuple[str, str]:
+    """Build WHERE and HAVING clauses based on selected filters."""
+
+    where_statement, having_statement = "", ""
+
+    for key, value in condition_dict.items():
+        if value["type"] == "D" and key not in filter_ids:
+            filter_ids.append(key)
+
+    for filter_id in filter_ids:
+        if filter_id not in condition_dict:
+            logger.log(f"Filter ID {filter_id} not found in condition_dict", level="WARN")
+            continue
+        filter_dict = condition_dict[filter_id]
+        statement = filter_dict["条件语句"] + " AND "
+        if filter_dict["聚合条件"]:
+            having_statement += statement
+        else:
+            where_statement += statement
+
+    return where_statement.rstrip(" AND "), having_statement.rstrip(" AND ")
+
+
+def build_sql_statement(card: CardInfo, where_statement: str, having_statement: str) -> str:
+    """Combine the SQL fragments for the final executable statement."""
+
+    sql = card.sql_select
+    if where_statement:
+        sql += f" WHERE {where_statement} "
+    if card.sql_groupby:
+        sql += card.sql_groupby
+    if having_statement:
+        sql += f" HAVING {having_statement} "
+    return sql.replace("\\n", "\n")
+
+
+def generate_qa_pairs(
+    dashboard_info: DashboardInfo,
+    generated_qa: List[Dict[str, Any]],
+    card_id_2_filters: Dict[str, Any],
+) -> List[QAPair]:
+    """Combine LLM output with metadata to create ``QAPair`` objects."""
+
+    qa_pairs: List[QAPair] = []
+    card_map = {card.card_id: card for card in dashboard_info.cards}
+
+    for item in generated_qa:
+        try:
+            card_id = item.get("card_id", "")
+            if card_id not in card_map:
+                logger.log(
+                    f"Card ID {card_id} not found in dashboard {dashboard_info.dashboard_id}",
+                    level="WARN",
+                )
+                continue
+
+            card = card_map[card_id]
+            filter_ids = item.get("filter_ids", [])
+            condition_dict = card_id_2_filters[card_id]
+
+            where_statement, having_statement = build_where_clauses(condition_dict, filter_ids)
+            sql_statement = build_sql_statement(card, where_statement, having_statement)
+            logger.log(f"Generated SQL for card ID {card_id}:\n{sql_statement}", level="DEBUG")
+
+            qa_pairs.append(
+                QAPair(
+                    dashboard_id=dashboard_info.dashboard_id,
+                    dashboard_name=dashboard_info.dashboard_name,
+                    dashboard_desc=dashboard_info.dashboard_desc,
+                    card_id=card.card_id,
+                    card_name=card.card_name,
+                    card_desc=card.card_desc,
+                    dataset_id=card.dataset_id,
+                    question=item.get("question", ""),
+                    question_with_slot=item.get("sample_question", ""),
+                    answer=item.get("sql", sql_statement),
+                    filter_ids=filter_ids,
+                )
+            )
+        except Exception as exc:  # pragma: no cover - defensive logging
+            logger.log(f"Error generating QA pair for item {item}: {exc}", level="WARN")
+
+    return qa_pairs
+
+
+def _clean_llm_response(text: str) -> str:
+    """Strip optional Markdown fences from the LLM output."""
+
+    content = text.strip()
+    if content.startswith("```"):
+        lines = content.splitlines()
+        if len(lines) >= 2:
+            content = "\n".join(lines[1:-1])
+    if content.startswith("json"):
+        content = content[4:].lstrip()
+    return content
+
+
+async def generate_qa_pair(input_args: Dict[str, Any]) -> List[QAPair]:
+    """Generate QA pairs based on dashboard + card metadata."""
+
+    user_request = validate_input(input_args)
+    dashboard_info = input_args["get_dashboard_info"]
+
+    content, card_id_2_filters = build_dashboard_content(dashboard_info)
+
+    prompt = await generate_prompt_content(PromptManager(), content, user_request)
+    save_prompt_log(dashboard_info.dashboard_id, prompt.text)
+
+    llm = LLMManager().get_llm_model(
+        PipelineManager().get_node_config(inspect.currentframe().f_code.co_name)["model"]
+    )
+    resp = await llm.ainvoke(prompt.text)
+
+    try:
+        generated_qa = json.loads(_clean_llm_response(resp.content))
+    except json.JSONDecodeError as exc:
+        logger.log(f"JSON decode error: {exc}, primary response: {resp.content}", level="ERROR")
+        generated_qa = []
+
+    if not generated_qa:
+        # fallback: one QA per card with basic filter-less SQL
+        generated_qa = [
+            {
+                "card_id": card.card_id,
+                "question": f"{card.card_name} 的指标是多少?",
+                "sample_question": f"请给出 {card.card_name} 的最新指标",
+                "filter_ids": [],
+            }
+            for card in dashboard_info.cards
+        ]
+
+    return generate_qa_pairs(dashboard_info, generated_qa, card_id_2_filters)
+
+
+if __name__ == "__main__":  # pragma: no cover - manual smoke test
+    import asyncio
+
+    from app.operators.get_dashboard_info import get_dashboard_info
+    from app.const import CONFIG_DIR
+
+    pipline_setup_path = CONFIG_DIR / "pipline_settings.yaml"
+    _pipline_manager = PipelineManager(str(pipline_setup_path))
+
+    template_config_path = CONFIG_DIR / "prompt_template.yaml"
+    _prompt_manager = PromptManager(str(template_config_path))
+
+    llm_config_path = CONFIG_DIR / "llm_settings.yaml"
+    _llm_manager = LLMManager(str(llm_config_path))
+
+    test_data = {
+        "dashboard_id": "test_dashboard_001",
+        "card_ids": ["card_001", "card_002"],
+        "bbk": "default_bbk",
+        "user_request": "",
+    }
+
+    print("Getting dashboard info...")
+    dashboard = asyncio.run(get_dashboard_info(test_data))
+    test_data["get_dashboard_info"] = dashboard
+
+    result = asyncio.run(generate_qa_pair(test_data))
+    print(result)

+ 81 - 0
app/operators/get_dashboard_info.py

@@ -0,0 +1,81 @@
+"""Operator to assemble dashboard metadata from DuckDB for downstream prompts."""
+
+from __future__ import annotations
+
+import json
+from typing import Any, Dict, List
+
+from app.data_access import DuckDBClient
+from app.endpoints.schema import CardFilter, CardInfo, DashboardInfo
+
+
+def _build_filters(filter_rows: List[Dict[str, Any]]) -> Dict[str, List[CardFilter]]:
+    filters_by_card: Dict[str, List[CardFilter]] = {}
+    for row in filter_rows:
+        options_raw = row.get("options") or "[]"
+        try:
+            options = json.loads(options_raw)
+        except Exception:
+            options = []
+        card_filter = CardFilter(
+            card_id=row["card_id"],
+            filter_id=row["filter_id"],
+            type=row.get("filter_type", "F"),
+            where_clause=row.get("where_clause", ""),
+            default_value=row.get("default_value"),
+            options=options,
+        )
+        filters_by_card.setdefault(row["card_id"], []).append(card_filter)
+    return filters_by_card
+
+
+def get_dashboard_info(input_args: Dict[str, Any]) -> DashboardInfo:
+    """Retrieve dashboard info + card definitions + filters for given ids."""
+
+    dashboard_id = input_args.get("dashboard_id")
+    card_ids = input_args.get("card_ids") or []
+    bbk_id = input_args.get("bbk")
+    if not dashboard_id:
+        raise ValueError("dashboard_id is required in input_args")
+
+    client = DuckDBClient()
+    mapping_rows = client.fetch_dashboard_cards(dashboard_id, card_ids, bbk_id)
+    if not mapping_rows:
+        raise ValueError(f"Dashboard {dashboard_id} not found or no cards match")
+
+    target_card_ids = [row["card_id"] for row in mapping_rows]
+    card_defs = {row["card_id"]: row for row in client.fetch_card_definition(target_card_ids)}
+    filter_rows = client.fetch_card_filters(target_card_ids)
+    filters_by_card = _build_filters(filter_rows)
+
+    dashboard_meta = client.fetch_dashboard_info(dashboard_id) or {
+        "dashboard_id": dashboard_id,
+        "dashboard_name": mapping_rows[0].get("dashboard_name", ""),
+        "dashboard_desc": "",
+        "folder_path": "",
+    }
+
+    cards: List[CardInfo] = []
+    for row in mapping_rows:
+        card_id = row["card_id"]
+        card_def = card_defs.get(card_id, {})
+        cards.append(
+            CardInfo(
+                card_id=card_id,
+                card_name=card_def.get("card_name", row.get("card_name", "")),
+                card_desc=card_def.get("card_desc", ""),
+                dataset_id=row.get("dataset_id", ""),
+                sql_select=card_def.get("sql_select", ""),
+                sql_where={},
+                sql_groupby=card_def.get("sql_groupby", ""),
+                filters=filters_by_card.get(card_id, []),
+            )
+        )
+
+    return DashboardInfo(
+        dashboard_id=dashboard_meta.get("dashboard_id", dashboard_id),
+        dashboard_name=dashboard_meta.get("dashboard_name", ""),
+        dashboard_desc=dashboard_meta.get("dashboard_desc", ""),
+        folder_path=dashboard_meta.get("folder_path", ""),
+        cards=cards,
+    )

+ 3 - 0
app/pipeline/__init__.py

@@ -0,0 +1,3 @@
+from .pipeline_manager import PipelineManager
+
+__all__ = ["PipelineManager"]

+ 73 - 0
app/pipeline/pipeline_manager.py

@@ -0,0 +1,73 @@
+
+from collections import defaultdict, deque
+from pathlib import Path
+from threading import Lock
+from typing import List, Optional
+from typing_extensions import Self
+import yaml
+
+from app.const import CONFIG_DIR
+
+
+class PipelineManager:
+    _instance = None
+    _lock = Lock()
+
+    def __new__(cls, pipeline_setup_path: Optional[Path] = None) -> Self:
+        with cls._lock:
+            if cls._instance is None:
+                setup_path = Path(pipeline_setup_path or (CONFIG_DIR / "pipeline_settings.yaml"))
+                cls._instance = super(PipelineManager, cls).__new__(cls)
+                cls._instance.build(setup_path)
+        return cls._instance
+    
+    def build(self, pipeline_setup_path: Path):
+
+        with open(pipeline_setup_path, "r", encoding='utf8') as f:
+            self.pipeline_setup = yaml.safe_load(f)
+
+        self._nodes = []
+        self._node_map = {}
+        for key in self.pipeline_setup:
+            setattr(self, key, self.pipeline_setup[key])
+            self._nodes.append(key)
+            self._node_map[key] = self.pipeline_setup[key]
+        
+        self._topological_sort()
+
+    def _topological_sort(self):
+        graph = defaultdict(list)
+        in_degree = {node: 0 for node in self._nodes}
+
+        for key, setup in self._node_map.items():
+            for dep in setup["dependencies"]:
+                graph[dep].append(key)
+                in_degree[key] += 1
+        
+        queue = deque([node for node in self._nodes if in_degree[node] == 0])
+        sorted_order = []
+
+        while queue:
+            current = queue.popleft()
+            sorted_order.append(current)
+            for neighbor in graph[current]:
+                in_degree[neighbor] -= 1
+                if in_degree[neighbor] == 0:
+                    queue.append(neighbor)
+        
+        if len(sorted_order) != len(self._nodes):
+            raise ValueError("Cycle detected in pipeline dependencies")
+        self._nodes = sorted_order
+
+    def get_node_config(self, node_name: str) -> dict:
+        if hasattr(self, node_name):
+            return getattr(self, node_name)
+        raise ValueError(f"Node {node_name} not found in pipeline configuration")
+
+    @property
+    def nodes(self):
+        return self._nodes
+    
+    @property
+    def node_map(self):
+        return self._node_map

+ 75 - 0
app/pipeline/workflow.py

@@ -0,0 +1,75 @@
+import asyncio
+from dataclasses import dataclass, field
+import inspect
+import json
+from typing import Any, Dict, List
+import app.operators
+from app.pipeline.pipeline_manager import PipelineManager
+
+function_map = {name: getattr(app.operators, name) for name in dir(app.operators) if callable(getattr(app.operators, name))}
+
+@dataclass
+class Node(object):
+    name: str
+    dependencies: List[str]
+    result: Any = None
+    extra_fields: Dict[str, Any] = field(default_factory=dict)
+
+    def __init__(self, name: str, dependencies: List[str], **kwargs):
+        self.name = name
+        self.dependencies = dependencies
+        self.extra_fields = kwargs
+        self.last_node = kwargs.get("last_node", False)
+
+class Workflow:
+    def __init__(self) -> None:
+        self.sorted_nodes = [Node(name=name, **PipelineManager().node_map[name]) for name in PipelineManager().nodes]
+        self.node_map: Dict[str, Node] = {node.name: node for node in self.sorted_nodes}
+        self.result = {}
+        self.lock = asyncio.Lock()
+        self.queue = asyncio.Queue()
+    
+    async def execute_node(self, node: Node):
+        func = function_map[node.name]
+
+        res = func(self.result)
+        if inspect.isawaitable(res):
+            try:
+                res = await res
+            except Exception as e:
+                res = {"error": str(e)}
+
+        node.result = res
+        async with self.lock:
+            self.result.update({node.name: res})
+            if node.last_node:
+                self.result.update({"result": res})
+
+        if inspect.isasyncgen(res):
+            async for item in res:
+                await self.queue.put(item)
+    
+    async def execute_workflow(self, input_args: Dict[str, Any], stream = False):
+        self.result.update(input_args)
+
+        level_map = {node.name: 0 for node in self.sorted_nodes}
+        for node in self.sorted_nodes:
+            if node.dependencies:
+                level_map[node.name] = max([level_map[dep] for dep in node.dependencies]) + 1
+        
+        max_level = max(level_map.values())
+
+        for level in range(max_level + 1):
+            current_level_nodes = [self.node_map[name] for name, lvl in level_map.items() if lvl == level]
+            tasks = [self.execute_node(node) for node in current_level_nodes]
+            try:
+                await asyncio.gather(*tasks)
+            except Exception as e:
+                print(f"Error executing level {level}: {e}")
+                raise e
+        
+        if stream:
+            await self.queue.put({"event": "end", "data": json.dumps({"status": "done", "answer": "[Done]"}, ensure_ascii=False)})
+            await self.queue.put(None)
+        else:
+            return self.result["result"]

+ 114 - 0
app/prompt_manager.py

@@ -0,0 +1,114 @@
+"""Prompt management utilities used across pipeline nodes."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+from threading import Lock
+from typing import Dict, Iterable, Optional
+
+import yaml
+
+from app.const import CONFIG_DIR, PROMPT_DIR
+
+
+DEFAULT_PROMPTS = {
+    "generate_qa_pair": {
+        "template": (
+            "你是一名BI分析助手,需要根据以下仪表盘与卡片的定义信息,"
+            "结合卡片的过滤器条件,为每张卡片生成多个用户问题及其对应的SQL查询。\n"
+            "仪表盘与卡片内容:\n{content}\n"
+            "请输出JSON数组,每个元素包含 card_id, question, sample_question, filter_ids。"
+        ),
+        "variables": ["content"],
+    },
+    "generate_qa_pair_with_user_request": {
+        "template": (
+            "以下是用户额外的指标需求:{user_request}\n"
+            "结合仪表盘内容生成更贴近需求的问题:\n{content}\n"
+            "输出格式同 generate_qa_pair。"
+        ),
+        "variables": ["user_request", "content"],
+    },
+}
+
+
+@dataclass
+class PromptRenderResult:
+    text: str
+    template_name: str
+
+
+class PromptTemplate:
+    """Simple template wrapper with async friendly render helper."""
+
+    def __init__(self, name: str, template: str, variables: Optional[Iterable[str]] = None) -> None:
+        self.name = name
+        self.template = template
+        self.variables = list(variables or [])
+
+    def render(self, context: Dict[str, str]) -> str:
+        missing = [var for var in self.variables if var not in context]
+        if missing:
+            raise KeyError(f"Missing variables {missing} for template {self.name}")
+        safe_context = {key: str(value) for key, value in context.items()}
+        return self.template.format(**safe_context)
+
+    async def ainvoke(self, context: Dict[str, str]) -> PromptRenderResult:
+        return PromptRenderResult(text=self.render(context), template_name=self.name)
+
+
+class PromptManager:
+    """Singleton responsible for loading prompt templates from disk or defaults."""
+
+    _instance: Optional["PromptManager"] = None
+    _lock = Lock()
+
+    def __new__(cls, config_path: Optional[Path] = None) -> "PromptManager":
+        with cls._lock:
+            if cls._instance is None:
+                cls._instance = super().__new__(cls)
+                cls._instance._build(config_path)
+        return cls._instance
+
+    def _build(self, config_path: Optional[Path]) -> None:
+        path = Path(config_path or (CONFIG_DIR / "prompt_template.yaml"))
+        self._templates: Dict[str, PromptTemplate] = {}
+        if path.exists():
+            self._load_from_config(path)
+        else:
+            self._load_defaults()
+
+    def _load_defaults(self) -> None:
+        for name, info in DEFAULT_PROMPTS.items():
+            self._templates[name] = PromptTemplate(
+                name=name,
+                template=info["template"],
+                variables=info.get("variables"),
+            )
+
+    def _load_from_config(self, path: Path) -> None:
+        with path.open("r", encoding="utf-8") as fh:
+            prompt_config = yaml.safe_load(fh) or {}
+        for name, info in prompt_config.items():
+            template_text = self._resolve_template_source(info)
+            self._templates[name] = PromptTemplate(
+                name=name,
+                template=template_text,
+                variables=info.get("variables", []),
+            )
+
+    def _resolve_template_source(self, info: Dict[str, str]) -> str:
+        if "template" in info:
+            return info["template"]
+        if "path" in info:
+            tpl_path = info["path"]
+            resolved = (PROMPT_DIR / tpl_path) if not Path(tpl_path).is_absolute() else Path(tpl_path)
+            with resolved.open("r", encoding="utf-8") as fh:
+                return fh.read()
+        raise ValueError("Prompt template must define either 'template' or 'path'")
+
+    def get_prompt_template(self, name: str) -> PromptTemplate:
+        if name not in self._templates:
+            raise KeyError(f"Prompt template {name} not found")
+        return self._templates[name]

+ 8 - 0
config/pipeline_settings.yaml

@@ -0,0 +1,8 @@
+get_dashboard_info:
+  dependencies: []
+  last_node: false
+
+generate_qa_pair:
+  dependencies: ["get_dashboard_info"]
+  last_node: true
+  model: "echo"

+ 7 - 0
config/prompt_template.yaml

@@ -0,0 +1,7 @@
+generate_qa_pair:
+  path: "generate_qa_pair.txt"
+  variables: ["content"]
+
+generate_qa_pair_with_user_request:
+  path: "generate_qa_pair_with_user_request.txt"
+  variables: ["content", "user_request"]

+ 152 - 0
data/data_test.ipynb

@@ -0,0 +1,152 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 6,
+   "id": "e7755adc",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<div>\n",
+       "<style scoped>\n",
+       "    .dataframe tbody tr th:only-of-type {\n",
+       "        vertical-align: middle;\n",
+       "    }\n",
+       "\n",
+       "    .dataframe tbody tr th {\n",
+       "        vertical-align: top;\n",
+       "    }\n",
+       "\n",
+       "    .dataframe thead th {\n",
+       "        text-align: right;\n",
+       "    }\n",
+       "</style>\n",
+       "<table border=\"1\" class=\"dataframe\">\n",
+       "  <thead>\n",
+       "    <tr style=\"text-align: right;\">\n",
+       "      <th></th>\n",
+       "      <th>name</th>\n",
+       "    </tr>\n",
+       "  </thead>\n",
+       "  <tbody>\n",
+       "    <tr>\n",
+       "      <th>0</th>\n",
+       "      <td>card_dataset_dashboard_mapping</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>1</th>\n",
+       "      <td>card_filter_info</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>2</th>\n",
+       "      <td>card_info</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>3</th>\n",
+       "      <td>dashboard_info</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>4</th>\n",
+       "      <td>dataset_ddl</td>\n",
+       "    </tr>\n",
+       "  </tbody>\n",
+       "</table>\n",
+       "</div>"
+      ],
+      "text/plain": [
+       "                             name\n",
+       "0  card_dataset_dashboard_mapping\n",
+       "1                card_filter_info\n",
+       "2                       card_info\n",
+       "3                  dashboard_info\n",
+       "4                     dataset_ddl"
+      ]
+     },
+     "execution_count": 6,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "import pandas as pd\n",
+    "import duckdb\n",
+    "\n",
+    "conn = duckdb.connect(database = \"./duckdb/bi_metadata.duckdb\")\n",
+    "conn.sql(\"show tables\").df()\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 12,
+   "id": "c8f35aba",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "┌────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐\n",
+       "│ dataset_id │                                                     dataset_ddl                                                      │\n",
+       "│  varchar   │                                                       varchar                                                        │\n",
+       "├────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤\n",
+       "│ DS0001     │ CREATE TABLE ds0001 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0002     │ CREATE TABLE ds0002 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0003     │ CREATE TABLE ds0003 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0004     │ CREATE TABLE ds0004 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0005     │ CREATE TABLE ds0005 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0006     │ CREATE TABLE ds0006 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0007     │ CREATE TABLE ds0007 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0008     │ CREATE TABLE ds0008 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0009     │ CREATE TABLE ds0009 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0010     │ CREATE TABLE ds0010 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│   ·        │                                                          ·                                                           │\n",
+       "│   ·        │                                                          ·                                                           │\n",
+       "│   ·        │                                                          ·                                                           │\n",
+       "│ DS0016     │ CREATE TABLE ds0016 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0017     │ CREATE TABLE ds0017 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0018     │ CREATE TABLE ds0018 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0019     │ CREATE TABLE ds0019 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0020     │ CREATE TABLE ds0020 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0021     │ CREATE TABLE ds0021 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0022     │ CREATE TABLE ds0022 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0023     │ CREATE TABLE ds0023 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0024     │ CREATE TABLE ds0024 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "│ DS0025     │ CREATE TABLE ds0025 (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n); │\n",
+       "├────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤\n",
+       "│ 25 rows (20 shown)                                                                                                      2 columns │\n",
+       "└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘"
+      ]
+     },
+     "execution_count": 12,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "conn.sql(\"select * from dataset_ddl\")\n"
+   ]
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": ".venv (3.13.3)",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.13.3"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}

二進制
data/duckdb/bi_metadata.duckdb


+ 179 - 0
data/simulate_bi_metadata.ipynb

@@ -0,0 +1,179 @@
+{
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {},
+      "source": [
+        "# DuckDB 仿真数据生成\n",
+        "\n",
+        "根据题目给定的五张表结构生成不少于 20 条的测试数据,并写入 `data/duckdb/bi_metadata.duckdb`。代码仅依赖标准库与 duckdb,避免外部网络。"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {},
+      "outputs": [],
+      "source": [
+        "from pathlib import Path\n",
+        "import random\n",
+        "import json\n",
+        "\n",
+        "import duckdb\n",
+        "\n",
+        "# 路径与随机种子\n",
+        "BASE_DIR = Path('data')\n",
+        "DB_PATH = BASE_DIR / 'duckdb' / 'bi_metadata.duckdb'\n",
+        "DB_PATH.parent.mkdir(parents=True, exist_ok=True)\n",
+        "random.seed(2024)\n",
+        "DB_PATH"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {},
+      "outputs": [],
+      "source": [
+        "# 构造模拟数据\n",
+        "bbk_ids = [f'BBK{idx:03d}' for idx in range(1, 6)]\n",
+        "dimensions = ['region', 'channel', 'product_line', 'customer_tier', 'province']\n",
+        "group_dims = ['stat_date', 'region', 'channel']\n",
+        "desc_samples = ['收入趋势', '用户留存', '渠道质量', '机构画像', '产品分析']\n",
+        "\n",
+        "dashboards = []\n",
+        "cards = []\n",
+        "mappings = []\n",
+        "filters = []\n",
+        "datasets = []\n",
+        "\n",
+        "card_counter = 1\n",
+        "dataset_counter = 1\n",
+        "dataset_max = 25  # 保证数据集条数 >=20\n",
+        "\n",
+        "for dash_idx in range(1, 6):\n",
+        "    dashboard_id = f'DB{dash_idx:03d}'\n",
+        "    dashboard_name = f'运营仪表板{dash_idx:02d}'\n",
+        "    dashboards.append({\n",
+        "        'dashboard_id': dashboard_id,\n",
+        "        'dashboard_name': dashboard_name,\n",
+        "        'dashboard_desc': random.choice(desc_samples) + '概览',\n",
+        "        'folder_path': f'/bbk/{dashboard_id.lower()}/folder_{dash_idx:02d}',\n",
+        "    })\n",
+        "\n",
+        "    for _ in range(5):  # 每个仪表板 5 张卡片,共 25 行\n",
+        "        card_id = f'C{card_counter:04d}'\n",
+        "        dataset_id = f'DS{dataset_counter:04d}'\n",
+        "        dataset_counter = dataset_counter % dataset_max + 1\n",
+        "        card_name = f'卡片{card_counter:02d}'\n",
+        "        bbk_id = random.choice(bbk_ids)\n",
+        "        select_field = random.choice(group_dims)\n",
+        "        sql_select = f'SELECT {select_field}, SUM(metric_value) AS metric_value FROM {dataset_id}'\n",
+        "        sql_groupby = f'GROUP BY {select_field}'\n",
+        "        sql_where = \"stat_date >= date '2023-01-01'\"\n",
+        "\n",
+        "        cards.append({\n",
+        "            'card_id': card_id,\n",
+        "            'card_name': card_name,\n",
+        "            'card_desc': f'{card_name} 指标解析',\n",
+        "            'sql_select': sql_select,\n",
+        "            'sql_where': sql_where,\n",
+        "            'sql_groupby': sql_groupby,\n",
+        "        })\n",
+        "\n",
+        "        mappings.append({\n",
+        "            'card_id': card_id,\n",
+        "            'dashboard_id': dashboard_id,\n",
+        "            'dataset_id': dataset_id,\n",
+        "            'dashboard_name': dashboard_name,\n",
+        "            'card_name': card_name,\n",
+        "            'dataset_name': f'{dataset_id}_data',\n",
+        "            'bbk_id': bbk_id,\n",
+        "        })\n",
+        "\n",
+        "        dim = random.choice(dimensions)\n",
+        "        filters.append({\n",
+        "            'card_id': card_id,\n",
+        "            'filter_id': f'{dim.upper()}_{card_counter:02d}',\n",
+        "            'filter_type': 'F',\n",
+        "            'where_clause': f'{dim}_name = ?',\n",
+        "            'default_value': 'ALL',\n",
+        "            'options': json.dumps([f\"{dim.title()} {opt}\" for opt in range(1, 5)]),\n",
+        "        })\n",
+        "        filters.append({\n",
+        "            'card_id': card_id,\n",
+        "            'filter_id': f'DATE_{card_counter:02d}',\n",
+        "            'filter_type': 'D',\n",
+        "            'where_clause': \"stat_date >= date '2023-01-01'\",\n",
+        "            'default_value': '2023-01-01',\n",
+        "            'options': json.dumps([]),\n",
+        "        })\n",
+        "\n",
+        "        datasets.append({\n",
+        "            'dataset_id': dataset_id,\n",
+        "            'dataset_ddl': f\"CREATE TABLE {dataset_id.lower()} (\\n    bbk_id VARCHAR,\\n    stat_date DATE,\\n    metric_value DOUBLE,\\n    dimension VARCHAR\\n);\",\n",
+        "        })\n",
+        "\n",
+        "        card_counter += 1\n",
+        "\n",
+        "len(cards), len(mappings), len(filters), len(datasets), len(dashboards)\n"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {},
+      "outputs": [],
+      "source": [
+        "# 创建表并插入数据\n",
+        "con = duckdb.connect(str(DB_PATH))\n",
+        "\n",
+        "con.execute(\"\nCREATE OR REPLACE TABLE card_dataset_dashboard_mapping (\n    card_id VARCHAR,\n    dashboard_id VARCHAR,\n    dataset_id VARCHAR,\n    dashboard_name VARCHAR,\n    card_name VARCHAR,\n    dataset_name VARCHAR,\n    bbk_id VARCHAR\n);\n\")\n",
+        "\n",
+        "con.execute(\"\nCREATE OR REPLACE TABLE card_info (\n    card_id VARCHAR,\n    card_name VARCHAR,\n    card_desc VARCHAR,\n    sql_select VARCHAR,\n    sql_where VARCHAR,\n    sql_groupby VARCHAR\n);\n\")\n",
+        "\n",
+        "con.execute(\"\nCREATE OR REPLACE TABLE card_filter_info (\n    card_id VARCHAR,\n    filter_id VARCHAR,\n    filter_type VARCHAR,\n    where_clause VARCHAR,\n    default_value VARCHAR,\n    options VARCHAR\n);\n\")\n",
+        "\n",
+        "con.execute(\"\nCREATE OR REPLACE TABLE dashboard_info (\n    dashboard_id VARCHAR,\n    dashboard_name VARCHAR,\n    dashboard_desc VARCHAR,\n    folder_path VARCHAR\n);\n\")\n",
+        "\n",
+        "con.execute(\"\nCREATE OR REPLACE TABLE dataset_ddl (\n    dataset_id VARCHAR,\n    dataset_ddl VARCHAR\n);\n\")\n",
+        "\n",
+        "con.executemany(\"INSERT INTO card_dataset_dashboard_mapping VALUES (?,?,?,?,?,?,?)\",\n                [(m['card_id'], m['dashboard_id'], m['dataset_id'], m['dashboard_name'], m['card_name'], m['dataset_name'], m['bbk_id']) for m in mappings])\n",
+        "con.executemany(\"INSERT INTO card_info VALUES (?,?,?,?,?,?)\",\n                [(c['card_id'], c['card_name'], c['card_desc'], c['sql_select'], c['sql_where'], c['sql_groupby']) for c in cards])\n",
+        "con.executemany(\"INSERT INTO card_filter_info VALUES (?,?,?,?,?,?)\",\n                [(f['card_id'], f['filter_id'], f['filter_type'], f['where_clause'], f['default_value'], f['options']) for f in filters])\n",
+        "con.executemany(\"INSERT INTO dashboard_info VALUES (?,?,?,?)\",\n                [(d['dashboard_id'], d['dashboard_name'], d['dashboard_desc'], d['folder_path']) for d in dashboards])\n",
+        "con.executemany(\"INSERT INTO dataset_ddl VALUES (?,?)\",\n                [(d['dataset_id'], d['dataset_ddl']) for d in datasets])\n",
+        "\n",
+        "con.close()\n",
+        "'tables created'\n"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {},
+      "outputs": [],
+      "source": [
+        "# 校验行数\n",
+        "with duckdb.connect(str(DB_PATH)) as con:\n",
+        "    for tbl in ['card_dataset_dashboard_mapping', 'card_info', 'card_filter_info', 'dashboard_info', 'dataset_ddl']:\n",
+        "        cnt = con.execute(f'SELECT COUNT(*) FROM {tbl}').fetchone()[0]\n",
+        "        print(tbl, cnt)\n",
+        "    print('示例行:', con.execute('SELECT * FROM card_dataset_dashboard_mapping LIMIT 3').fetchall())\n"
+      ]
+    }
+  ],
+  "metadata": {
+    "kernelspec": {
+      "display_name": "Python 3",
+      "language": "python",
+      "name": "python3"
+    },
+    "language_info": {
+      "name": "python",
+      "version": "3.11"
+    }
+  },
+  "nbformat": 4,
+  "nbformat_minor": 5
+}

+ 96 - 0
docs/software_requirements.md

@@ -0,0 +1,96 @@
+# 智能问答模块软件需求说明书
+
+## 1. 总体概述
+- 本系统外挂在现有 BI 平台,负责解析仪表板、卡片以及机构(bbk)维度下的问答需求,自动生成 SQL 并在 BI 系统中执行。 
+- 系统提供 FastAPI 后端接口,供前端或其他服务发起“问题生成/确认”流程,并将确认后的问答对写入 parquet 文件,用于离线训练或直接答复。
+- 全部服务运行在后台,无前端组件;所有对话上下文均来源于 DuckDB 中的 BI 元数据表及机构配置。
+
+## 2. 数据结构要求
+1. **卡片-数据集-仪表板映射表**:字段 `card_id、dashboard_id、dataset_id、dashboard_name、card_name、dataset_name、bbk_id`。作为主数据表,保证数据有效。 
+2. **卡片信息表**:字段 `card_id、card_name、card_desc、sql_select、sql_where、sql_groupby` 描述卡片的 SQL 形态。 
+3. **卡片筛选器信息表**:字段 `card_id、filter_id、filter_type(F/D)、where_clause、default_value、options` 描述可选筛选器与内置过滤条件。 
+4. **仪表板信息表**:字段 `dashboard_id、dashboard_name、dashboard_desc、folder_path` 描述仪表板元数据。 
+5. **数据集 DDL 表**:字段 `dataset_id、dataset_ddl`,按 HiveSQL 规范存放。 
+
+这些结构均存放于 DuckDB,所有查询均基于机构编号(bbk/bbk_id)过滤。
+
+## 3. 系统架构
+- **接口层**:`app/endpoints` 下的 FastAPI `generate_qa_pair` 接口作为唯一对外入口。
+- **工作流层**:`PipelineManager` + `Workflow` 负责解析配置化节点依赖,并串联 operator。
+- **Operator 层**:`generate_qa_pair` operator 协调数据读取、prompt 渲染、LLM 调用与结果解析。
+- **支撑模块**:
+  - `LLMManager` 统一管理不同模型提供方与解析策略。
+  - `PromptManager` 负责 prompt 模板的存储、渲染和复用。
+  - `ParserManager`/`parsers` 定义 LLM 输出解析策略。
+  - `DuckDBClient` 提供线程安全的数据查询封装。
+  - 日志模块 `app.log` 负责统一输出。
+  - 常量模块 `app.const` 维护目录、阈值等。
+
+模块间关系:接口层调用 `Workflow`;`Workflow` 根据配置调用 operator;operator 内部组合数据访问模块、Prompt/LLM 管理器与解析器。最终结果写入日志目录与 parquet 文件。
+
+## 4. 模块划分与实现要点
+
+### 4.1 接口模块(FastAPI)
+- 负责接收 `QARequest`,校验卡片数量不超过 `MAX_INPUT_CARD_IDS`。
+- 将 `bbk_id/dashbord/card_ids/user_request` 打包成 `input_args` 交给 `Workflow`。
+- 返回 `QAResponse`,包含仪表板、机构、卡片 ID 及问答列表。
+
+### 4.2 工作流模块
+- `PipelineManager`:从 `config/pipeline_settings.yaml` 读取节点定义,生成拓扑顺序。
+- `Workflow`:按照拓扑依次执行 operator,支持异步/流式输出。
+- 节点之间通过 `workflow.result` 共享数据,`last_node` 节点需写入最终结果。
+
+### 4.3 Operator:generate_qa_pair
+- 从 `DuckDBClient` 读取仪表板、卡片、筛选器及 DDL 元数据,组装为 `DashboardInfo`。
+- 使用 `PromptManager` 渲染 `generate_qa_pair` 或带用户偏好模板。
+- 调用 `LLMManager` 按节点配置选择模型,获取 JSON 问答列表,并借助 `ParserManager` 解析。
+- 将过滤条件转换为 WHERE/HAVING 语句,拼接最终 SQL,构造 `QAPair`。
+- 记录 prompt 文本和生成结果日志,并触发 parquet 落盘(后续模块负责)。
+
+### 4.4 LLM 管理模块
+- `LLMManager` 为单例,读取 `config/llm_settings.yaml`。
+- 支持多种 provider:默认 `echo`(回声,便于测试)、`http`(通过 HTTP 调用自建/第三方 LLM 服务)。
+- 每个模型可指定解析器(如 `json`、`text`),通过 `ParserManager` 统一管理。
+- 暴露 `get_llm_model(model_name)`,operator 根据 pipeline 配置取用。
+
+### 4.5 Prompt 管理模块
+- `PromptManager` 读取 `config/prompt_template.yaml` 或使用内置默认模板。
+- 模板可存放于 `prompts/` 目录或直接配置在 YAML 中,支持变量占位与必填校验。
+- `PromptTemplate.ainvoke(context)` 异步渲染并返回文本,供 operator 直接写入日志或传给 LLM。
+
+### 4.6 解析器模块
+- `parsers.BaseParser` 抽象解析器接口。
+- 内置 `JsonParser`(去除 Markdown 包裹、解析 JSON)与 `TextParser`(直接返回文本)。
+- `ParserManager` 允许通过配置注册自定义解析器或重写默认行为。
+
+### 4.7 数据访问模块
+- `DuckDBClient` 封装了线程安全查询方法,支持:
+  - `fetch_dashboard_cards`:根据 `dashboard_id`、`card_ids`、`bbk_id` 过滤卡片。
+  - `fetch_card_definition`:获取卡片 SQL 定义。
+  - `fetch_card_filters`:获取筛选器/固化条件。
+  - `fetch_dashboard_info`、`fetch_dataset_ddl`:补全仪表板及数据集信息。
+- 所有查询返回字段-值 dict 列表,以供 Pydantic 模型构造。
+
+### 4.8 存储与日志
+- 日志目录 `app/log`,通过 `app.log.logger` 写入,支持级别控制。
+- Prompt 与问答日志写入 `LOG_ROOT/prompts/...` 便于审计。
+- 经过人工确认的问题将由后续节点序列化到 parquet,文件路径遵循 `DATA_DIR/parquet/{bbk_id}/{dashboard_id}/`。
+
+## 5. 业务流程
+1. 用户在 BI 前端选定仪表板、卡片与机构,触发 API。 
+2. 接口层调用 `Workflow`;`get_dashboard_info` 节点(后续实现)使用 `DuckDBClient` 读取所需元数据。 
+3. `generate_qa_pair` operator 构建 prompt,调用 LLM 生成若干候选问题和过滤条件。 
+4. 系统将问答转成 SQL,连同 slot 化问题写入返回体与 parquet,日志记录 prompt 与 SQL。 
+5. 后续模块可基于这些问答对执行真实查询或训练问答模型。
+
+## 6. 非功能需求
+- **性能**:单次请求支持最多 3 张卡片(可通过常量调整)。DuckDB 查询需在 1 秒内返回元数据。 
+- **扩展性**:LLM 与 Prompt 均通过配置驱动,可为不同机构配置不同模型或模板。 
+- **可观测性**:日志必须包含 request_id、bbk_id、dashboard_id 以便审计。 
+- **安全**:所有外部 LLM HTTP 调用需支持自定义 Header,用于透传鉴权 token。 
+
+## 7. 开发注意事项
+- 任何变量或字段中包含 `bbk/bbk_id` 均指机构编号。 
+- 当 YAML/模板缺失时,模块会回退到默认模板与 Echo LLM,方便在无外部依赖下开发调试。 
+- DuckDB 表结构需与上述字段保持一致,字段命名采用 snake_case。
+

+ 45 - 0
field_similarity.py

@@ -0,0 +1,45 @@
+def _longest_common_substring_length(a, b):
+    if not a or not b:
+        return 0
+
+    prev = [0] * (len(b) + 1)
+    max_len = 0
+    for i in range(1, len(a) + 1):
+        curr = [0] * (len(b) + 1)
+        for j in range(1, len(b) + 1):
+            if a[i - 1] == b[j - 1]:
+                curr[j] = prev[j - 1] + 1
+                if curr[j] > max_len:
+                    max_len = curr[j]
+        prev = curr
+    return max_len
+
+
+def analyze_fields(question, field_list):
+    if question is None or field_list is None:
+        return {}
+
+    fields = field_list.split("@@")
+    ratios = []
+    field_count = 0
+
+    for field in fields:
+        if not field:
+            continue
+        field_count += 1
+        lcs_len = _longest_common_substring_length(question, field)
+        ratios.append(lcs_len / len(field))
+
+    if field_count == 0:
+        return {}
+
+    non_zero = sum(1 for r in ratios if r != 0)
+    avg = sum(ratios) / field_count
+    max_ratio = max(ratios) if ratios else 0
+
+    return {
+        "field_count": field_count,
+        "non_zero_count": non_zero,
+        "avg_ratio": avg,
+        "max_ratio": max_ratio,
+    }

+ 7 - 0
gdb_utils/__init__.py

@@ -0,0 +1,7 @@
+"""
+Utility module for database helpers.
+"""
+
+from .opengauss_pool import OpenGaussConnectionPool, ConnectionPoolConfig
+
+__all__ = ["OpenGaussConnectionPool", "ConnectionPoolConfig"]

+ 333 - 0
gdb_utils/opengauss_pool.py

@@ -0,0 +1,333 @@
+"""
+Lightweight connection pool built on top of py_opengauss.
+
+The pool keeps a configurable number of connections ready, validates them on
+borrow when requested, performs periodic health checks/keepalives, and discards
+connections that fail SQL execution or exceed idle/lifetime thresholds.
+"""
+
+from __future__ import annotations
+
+import logging
+import threading
+import time
+from collections import deque
+from contextlib import contextmanager
+from dataclasses import dataclass
+from typing import Any, Deque, Dict, Optional
+
+import py_opengauss
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class ConnectionPoolConfig:
+    dsn: str
+    min_size: int = 1
+    max_size: int = 10
+    idle_timeout: float = 300.0
+    max_lifetime: float = 3600.0
+    test_on_borrow: bool = True
+    test_sql: str = "SELECT 1"
+    keepalive: bool = True
+    keepalive_interval: float = 60.0
+    health_check_interval: float = 30.0
+    connect_kwargs: Optional[Dict[str, Any]] = None
+
+
+@dataclass
+class _ConnectionEntry:
+    conn: Any
+    created_at: float
+    last_used: float
+    last_check: float
+
+
+class OpenGaussConnectionPool:
+    """
+    Thread-safe connection pool for py_opengauss.
+
+    Borrowed connections are returned directly without wrappers. When an error
+    is detected during SQL execution, callers should mark the connection as
+    broken via `return_connection(conn, had_error=True)` to evict it quickly.
+    """
+
+    def __init__(self, config: ConnectionPoolConfig):
+        if config.min_size < 0 or config.max_size <= 0:
+            raise ValueError("Pool sizes must be positive")
+        if config.min_size > config.max_size:
+            raise ValueError("min_size cannot be greater than max_size")
+
+        self._config = config
+        self._lock = threading.Lock()
+        self._condition = threading.Condition(self._lock)
+        self._available: Deque[_ConnectionEntry] = deque()
+        self._in_use: Dict[int, _ConnectionEntry] = {}
+        self._total = 0
+        self._closed = False
+
+        self._housekeeper = threading.Thread(
+            target=self._housekeeping, name="opengauss-pool-housekeeper", daemon=True
+        )
+
+        self._initial_fill()
+        self._housekeeper.start()
+
+    def borrow(self, timeout: Optional[float] = None) -> Any:
+        """
+        Borrow a connection from the pool.
+
+        If test_on_borrow is enabled, the connection will be validated with the
+        configured SQL before being returned.
+        """
+        deadline = time.monotonic() + timeout if timeout else None
+        while True:
+            entry = self._try_acquire_available()
+            if entry:
+                if self._config.test_on_borrow and not self._validate(entry):
+                    self._discard(entry)
+                    continue
+                entry.last_used = time.time()
+                with self._condition:
+                    self._in_use[id(entry.conn)] = entry
+                return entry.conn
+
+            to_create = self._reserve_slot_for_new()
+            if to_create:
+                entry = self._create_entry(reserved=True)
+                if entry is None:
+                    # Creation failed; avoid tight loop when DB is unavailable.
+                    with self._condition:
+                        if self._closed:
+                            raise RuntimeError("Connection pool is closed")
+                        if deadline is not None:
+                            remaining = deadline - time.monotonic()
+                            if remaining <= 0:
+                                raise TimeoutError("Timed out waiting for connection")
+                            self._condition.wait(timeout=min(0.1, remaining))
+                        else:
+                            self._condition.wait(timeout=0.1)
+                    continue
+                with self._condition:
+                    self._in_use[id(entry.conn)] = entry
+                return entry.conn
+
+            # Wait for a return or until timed out.
+            with self._condition:
+                if self._closed:
+                    raise RuntimeError("Connection pool is closed")
+                if deadline is not None:
+                    remaining = deadline - time.monotonic()
+                    if remaining <= 0:
+                        raise TimeoutError("Timed out waiting for connection")
+                    self._condition.wait(timeout=remaining)
+                else:
+                    self._condition.wait()
+
+    def return_connection(self, conn: Any, had_error: bool = False) -> None:
+        """
+        Return a connection to the pool.
+
+        If had_error is True, the connection will be closed and removed to
+        avoid reusing broken connections.
+        """
+        entry = None
+        with self._condition:
+            entry = self._in_use.pop(id(conn), None)
+            if entry is None:
+                # Unknown connection; close it defensively.
+                try:
+                    conn.close()
+                finally:
+                    return
+
+        if had_error:
+            self._discard(entry)
+        else:
+            entry.last_used = time.time()
+            if self._should_discard(entry):
+                self._discard(entry)
+            else:
+                with self._condition:
+                    self._available.append(entry)
+                    self._condition.notify()
+
+    @contextmanager
+    def connection(self, timeout: Optional[float] = None):
+        """
+        Context manager helper that returns the connection to the pool
+        automatically and discards it if an exception is raised.
+        """
+        conn = self.borrow(timeout=timeout)
+        try:
+            yield conn
+        except Exception:
+            self.return_connection(conn, had_error=True)
+            raise
+        else:
+            self.return_connection(conn)
+
+    def close(self) -> None:
+        """
+        Close the pool and all managed connections.
+        """
+        with self._condition:
+            if self._closed:
+                return
+            self._closed = True
+            to_close = list(self._available)
+            self._available.clear()
+            to_close.extend(self._in_use.values())
+            self._in_use.clear()
+            self._condition.notify_all()
+        for entry in to_close:
+            try:
+                entry.conn.close()
+            except Exception:
+                logger.debug("Failed closing connection during pool shutdown", exc_info=True)
+        self._total = 0
+
+    def stats(self) -> Dict[str, Any]:
+        """
+        Lightweight snapshot of pool counters for observability/testing.
+        """
+        with self._condition:
+            return {
+                "total": self._total,
+                "available": len(self._available),
+                "in_use": len(self._in_use),
+                "closed": self._closed,
+            }
+
+    # Internal helpers -----------------------------------------------------
+
+    def _initial_fill(self) -> None:
+        for _ in range(self._config.min_size):
+            entry = self._create_entry()
+            if entry:
+                with self._condition:
+                    self._available.append(entry)
+                    self._total += 1
+
+    def _reserve_slot_for_new(self) -> bool:
+        with self._condition:
+            if self._closed:
+                raise RuntimeError("Connection pool is closed")
+            if self._total >= self._config.max_size:
+                return False
+            self._total += 1
+            return True
+
+    def _try_acquire_available(self) -> Optional[_ConnectionEntry]:
+        now = time.time()
+        to_discard: Deque[_ConnectionEntry] = deque()
+        with self._condition:
+            if self._closed:
+                raise RuntimeError("Connection pool is closed")
+            while self._available:
+                entry = self._available.popleft()
+                if self._should_discard(entry, now):
+                    to_discard.append(entry)
+                    continue
+                return entry
+        for entry in list(to_discard):
+            self._discard(entry)
+        return None
+
+    def _create_entry(self, reserved: bool = False) -> Optional[_ConnectionEntry]:
+        try:
+            conn = py_opengauss.open(self._config.dsn, **(self._config.connect_kwargs or {}))
+            now = time.time()
+            return _ConnectionEntry(conn=conn, created_at=now, last_used=now, last_check=now)
+        except Exception:
+            logger.exception("Failed to create new py_opengauss connection")
+            if reserved:
+                with self._condition:
+                    self._total = max(0, self._total - 1)
+            return None
+
+    def _validate(self, entry: _ConnectionEntry) -> bool:
+        try:
+            stmt = entry.conn.prepare(self._config.test_sql)
+            stmt()
+            entry.last_check = time.time()
+            return True
+        except Exception:
+            logger.warning("Validation failed; discarding connection", exc_info=True)
+            return False
+
+    def _should_discard(self, entry: _ConnectionEntry, now: Optional[float] = None) -> bool:
+        now = now or time.time()
+        if self._config.max_lifetime and now - entry.created_at >= self._config.max_lifetime:
+            return True
+        if self._config.idle_timeout and now - entry.last_used >= self._config.idle_timeout:
+            return True
+        return False
+
+    def _discard(self, entry: _ConnectionEntry) -> None:
+        try:
+            entry.conn.close()
+        except Exception:
+            logger.debug("Failed closing connection", exc_info=True)
+        with self._condition:
+            self._total = max(0, self._total - 1)
+            self._condition.notify()
+
+    def _housekeeping(self) -> None:
+        interval = max(1.0, self._config.health_check_interval)
+        while True:
+            if self._closed:
+                return
+            self._perform_health_check()
+            time.sleep(interval)
+
+    def _perform_health_check(self) -> None:
+        now = time.time()
+        to_check: Deque[_ConnectionEntry] = deque()
+        to_discard: Deque[_ConnectionEntry] = deque()
+        with self._condition:
+            if self._closed:
+                return
+            kept: Deque[_ConnectionEntry] = deque()
+            while self._available:
+                entry = self._available.popleft()
+                if self._should_discard(entry, now):
+                    to_discard.append(entry)
+                    continue
+                if (
+                    self._config.keepalive
+                    and self._config.keepalive_interval
+                    and now - entry.last_check >= self._config.keepalive_interval
+                ):
+                    to_check.append(entry)
+                else:
+                    kept.append(entry)
+            self._available = kept
+
+        for entry in list(to_discard):
+            self._discard(entry)
+
+        # Run keepalive checks outside lock to avoid blocking borrowers.
+        for entry in list(to_check):
+            if self._validate(entry):
+                with self._condition:
+                    if not self._closed:
+                        self._available.append(entry)
+            else:
+                self._discard(entry)
+
+        # Ensure minimum pool size is preserved.
+        with self._condition:
+            needed = max(0, self._config.min_size - self._total)
+        for _ in range(needed):
+            entry = self._create_entry()
+            if entry:
+                with self._condition:
+                    if self._closed:
+                        entry.conn.close()
+                        return
+                    self._available.append(entry)
+                    self._total += 1
+                    self._condition.notify()

+ 438 - 0
gdb_utils/opengauss_pool_hardened.py

@@ -0,0 +1,438 @@
+"""
+Hardened connection pool for py_opengauss inspired by dbutils.pooled_db.
+
+Key differences from the lightweight pool:
+* Uses a proxy wrapper so connections are always returned (close/__del__/with).
+* Resets sessions on return (rollback + optional setsession).
+* Avoids deadlocks by using an RLock and separating discard-with-lock paths.
+* Supports blocking/non-blocking acquire with timeout and max usage cycling.
+* Uses monotonic clocks for all interval calculations to avoid wall-clock jumps.
+"""
+
+from __future__ import annotations
+
+import logging
+import threading
+import time
+from collections import deque
+from contextlib import contextmanager
+from dataclasses import dataclass
+from typing import Any, Deque, Dict, Optional, Sequence
+
+import py_opengauss
+
+logger = logging.getLogger(__name__)
+
+
+class PoolError(Exception):
+    """Base pool error."""
+
+
+class PoolClosedError(PoolError):
+    """Pool has been closed."""
+
+
+class PoolExhaustedError(PoolError):
+    """Pool is at capacity and blocking is disabled or timed out."""
+
+
+@dataclass
+class ConnectionPoolConfig:
+    dsn: str
+    min_size: int = 1
+    max_size: int = 10
+    blocking: bool = True
+    acquire_timeout: Optional[float] = None
+    idle_timeout: float = 300.0
+    max_lifetime: float = 3600.0
+    max_usage: Optional[int] = None
+    test_on_borrow: bool = True
+    test_sql: str = "SELECT 1"
+    keepalive: bool = True
+    keepalive_interval: float = 60.0
+    health_check_interval: float = 30.0
+    reset_on_return: bool = True
+    setsession: Optional[Sequence[str]] = None
+    connect_kwargs: Optional[Dict[str, Any]] = None
+
+
+@dataclass
+class _ConnectionEntry:
+    conn: Any
+    created_at: float
+    last_used: float
+    last_check: float
+    usage_count: int = 0
+
+
+class _PooledConnectionProxy:
+    """Lightweight proxy that returns connections to the pool on close/del."""
+
+    def __init__(self, pool: "OpenGaussConnectionPool", entry: _ConnectionEntry):
+        self._pool = pool
+        self._entry = entry
+        self._returned = False
+
+    def __getattr__(self, item: str) -> Any:
+        return getattr(self._entry.conn, item)
+
+    def __enter__(self) -> "_PooledConnectionProxy":
+        return self
+
+    def __exit__(self, exc_type, exc, tb) -> None:
+        self.close(broken=bool(exc_type))
+
+    def close(self, broken: bool = False) -> None:
+        if self._returned:
+            return
+        self._returned = True
+        self._pool._return_entry(self._entry, broken=broken)
+
+    def mark_broken(self) -> None:
+        """Explicitly mark this connection as broken so it gets discarded."""
+        self.close(broken=True)
+
+    def __del__(self):
+        # Best-effort return to pool to avoid leaks if user forgets to close.
+        try:
+            self.close()
+        except Exception:
+            pass
+
+    def __repr__(self) -> str:
+        return f"<PooledOpenGaussConnection id={id(self._entry.conn)} returned={self._returned}>"
+
+
+class OpenGaussConnectionPool:
+    """
+    Hardened, thread-safe connection pool for py_opengauss.
+
+    Connections are wrapped in a proxy that ensures return to the pool on close
+    or object finalization. Pool operations rely on monotonic clocks to avoid
+    issues from system clock jumps.
+    """
+
+    def __init__(self, config: ConnectionPoolConfig):
+        if config.min_size < 0 or config.max_size <= 0:
+            raise ValueError("Pool sizes must be positive")
+        if config.min_size > config.max_size:
+            raise ValueError("min_size cannot be greater than max_size")
+
+        self._config = config
+        self._lock = threading.RLock()
+        self._condition = threading.Condition(self._lock)
+        self._available: Deque[_ConnectionEntry] = deque()
+        self._in_use: Dict[int, _ConnectionEntry] = {}
+        self._total = 0
+        self._create_failures = 0
+        self._discarded = 0
+        self._closed = False
+        self._stop_event = threading.Event()
+
+        self._housekeeper = threading.Thread(
+            target=self._housekeeping, name="opengauss-pool-housekeeper", daemon=True
+        )
+
+        self._initial_fill()
+        self._housekeeper.start()
+
+    def borrow(self, timeout: Optional[float] = None) -> _PooledConnectionProxy:
+        """
+        Borrow a connection from the pool.
+
+        If test_on_borrow is enabled, the connection will be validated with the
+        configured SQL before being returned. When blocking is False, pool
+        exhaustion immediately raises PoolExhaustedError.
+        """
+        deadline = None
+        effective_timeout = timeout
+        if effective_timeout is None:
+            effective_timeout = self._config.acquire_timeout
+        if effective_timeout is not None:
+            deadline = time.monotonic() + effective_timeout
+
+        while True:
+            entry = self._try_acquire_available()
+            if entry:
+                proxy = self._prepare_borrow(entry)
+                if proxy:
+                    return proxy
+                continue
+
+            to_create = self._reserve_slot_for_new()
+            if to_create:
+                entry = self._create_entry(reserved=True)
+                if entry is None:
+                    # Creation failed; avoid tight loop when DB is unavailable.
+                    self._wait_for_availability(deadline)
+                    continue
+                proxy = self._prepare_borrow(entry)
+                if proxy:
+                    return proxy
+                continue
+
+            # Pool is at capacity and nothing available.
+            if not self._config.blocking:
+                raise PoolExhaustedError("Pool is at capacity and blocking is disabled")
+
+            self._wait_for_availability(deadline)
+
+    def return_connection(self, conn: Any, had_error: bool = False) -> None:
+        """
+        Return a connection (or proxy) to the pool.
+
+        If had_error is True, the connection will be closed and removed to
+        avoid reusing broken connections.
+        """
+        entry = None
+        if isinstance(conn, _PooledConnectionProxy):
+            conn.close(broken=had_error)
+            return
+
+        with self._condition:
+            entry = self._in_use.pop(id(conn), None)
+
+        if entry is None:
+            try:
+                conn.close()
+            finally:
+                return
+        self._return_entry(entry, broken=had_error)
+
+    @contextmanager
+    def connection(self, timeout: Optional[float] = None):
+        """
+        Context manager helper that returns the connection to the pool
+        automatically and discards it if an exception is raised.
+        """
+        proxy = self.borrow(timeout=timeout)
+        try:
+            yield proxy
+        except Exception:
+            proxy.close(broken=True)
+            raise
+        else:
+            proxy.close()
+
+    def close(self) -> None:
+        """Close the pool and all managed connections."""
+        with self._condition:
+            if self._closed:
+                return
+            self._closed = True
+            to_close = list(self._available)
+            self._available.clear()
+            to_close.extend(self._in_use.values())
+            self._in_use.clear()
+            self._condition.notify_all()
+            self._stop_event.set()
+        for entry in to_close:
+            self._close_entry(entry)
+        self._total = 0
+        if self._housekeeper.is_alive():
+            self._housekeeper.join(timeout=1.0)
+
+    def stats(self) -> Dict[str, Any]:
+        """Lightweight snapshot of pool counters for observability/testing."""
+        with self._condition:
+            return {
+                "total": self._total,
+                "available": len(self._available),
+                "in_use": len(self._in_use),
+                "closed": self._closed,
+                "create_failures": self._create_failures,
+                "discarded": self._discarded,
+            }
+
+    # Internal helpers -----------------------------------------------------
+
+    def _prepare_borrow(self, entry: _ConnectionEntry) -> Optional[_PooledConnectionProxy]:
+        if self._config.test_on_borrow and not self._validate(entry):
+            self._discard_entry(entry)
+            return None
+        entry.last_used = time.monotonic()
+        entry.usage_count += 1
+        with self._condition:
+            self._in_use[id(entry.conn)] = entry
+        return _PooledConnectionProxy(self, entry)
+
+    def _initial_fill(self) -> None:
+        for _ in range(self._config.min_size):
+            entry = self._create_entry()
+            if entry:
+                with self._condition:
+                    self._available.append(entry)
+                    self._total += 1
+
+    def _reserve_slot_for_new(self) -> bool:
+        with self._condition:
+            if self._closed:
+                raise PoolClosedError("Connection pool is closed")
+            if self._total >= self._config.max_size:
+                return False
+            self._total += 1
+            return True
+
+    def _try_acquire_available(self) -> Optional[_ConnectionEntry]:
+        now = time.monotonic()
+        with self._condition:
+            if self._closed:
+                raise PoolClosedError("Connection pool is closed")
+            while self._available:
+                entry = self._available.popleft()
+                if self._should_discard(entry, now):
+                    self._discard_entry_locked(entry)
+                    continue
+                return entry
+        return None
+
+    def _create_entry(self, reserved: bool = False) -> Optional[_ConnectionEntry]:
+        try:
+            conn = py_opengauss.open(self._config.dsn, **(self._config.connect_kwargs or {}))
+            now = time.monotonic()
+            return _ConnectionEntry(conn=conn, created_at=now, last_used=now, last_check=now)
+        except Exception:
+            logger.exception("Failed to create new py_opengauss connection")
+            self._create_failures += 1
+            if reserved:
+                with self._condition:
+                    self._total = max(0, self._total - 1)
+                    self._condition.notify_all()
+            return None
+
+    def _validate(self, entry: _ConnectionEntry) -> bool:
+        try:
+            stmt = entry.conn.prepare(self._config.test_sql)
+            stmt()
+            entry.last_check = time.monotonic()
+            return True
+        except Exception:
+            logger.warning("Validation failed; discarding connection", exc_info=True)
+            return False
+
+    def _should_discard(self, entry: _ConnectionEntry, now: Optional[float] = None) -> bool:
+        now = now or time.monotonic()
+        if self._config.max_lifetime and now - entry.created_at >= self._config.max_lifetime:
+            return True
+        if self._config.idle_timeout and now - entry.last_used >= self._config.idle_timeout:
+            return True
+        if self._config.max_usage and entry.usage_count >= self._config.max_usage:
+            return True
+        return False
+
+    def _reset_connection(self, entry: _ConnectionEntry) -> bool:
+        if not self._config.reset_on_return and not self._config.setsession:
+            return True
+        try:
+            if self._config.reset_on_return:
+                rollback = getattr(entry.conn, "rollback", None)
+                if callable(rollback):
+                    rollback()
+                else:
+                    entry.conn.execute("ROLLBACK")
+            if self._config.setsession:
+                for sql in self._config.setsession:
+                    stmt = entry.conn.prepare(sql)
+                    stmt()
+            return True
+        except Exception:
+            logger.warning("Failed to reset connection; discarding", exc_info=True)
+            return False
+
+    def _close_entry(self, entry: _ConnectionEntry) -> None:
+        try:
+            entry.conn.close()
+        except Exception:
+            logger.debug("Failed closing connection", exc_info=True)
+
+    def _discard_entry_locked(self, entry: _ConnectionEntry) -> None:
+        """Discard while holding the pool lock."""
+        self._close_entry(entry)
+        self._total = max(0, self._total - 1)
+        self._discarded += 1
+        self._condition.notify_all()
+
+    def _discard_entry(self, entry: _ConnectionEntry) -> None:
+        with self._condition:
+            self._discard_entry_locked(entry)
+
+    def _return_entry(self, entry: _ConnectionEntry, broken: bool = False) -> None:
+        with self._condition:
+            stored = self._in_use.pop(id(entry.conn), None)
+        if stored is None:
+            return
+        entry = stored
+        if broken or not self._reset_connection(entry) or self._should_discard(entry):
+            self._discard_entry(entry)
+            return
+        with self._condition:
+            if self._closed:
+                self._discard_entry_locked(entry)
+                return
+            entry.last_used = time.monotonic()
+            self._available.append(entry)
+            self._condition.notify_all()
+
+    def _wait_for_availability(self, deadline: Optional[float]) -> None:
+        if deadline is not None:
+            remaining = deadline - time.monotonic()
+            if remaining <= 0:
+                raise PoolExhaustedError("Timed out waiting for connection from pool")
+            with self._condition:
+                if self._closed:
+                    raise PoolClosedError("Connection pool is closed")
+                self._condition.wait(timeout=remaining)
+        else:
+            with self._condition:
+                if self._closed:
+                    raise PoolClosedError("Connection pool is closed")
+                self._condition.wait()
+
+    def _housekeeping(self) -> None:
+        interval = max(1.0, self._config.health_check_interval)
+        while not self._stop_event.wait(interval):
+            self._perform_health_check()
+
+    def _perform_health_check(self) -> None:
+        now = time.monotonic()
+        to_check: Deque[_ConnectionEntry] = deque()
+        with self._condition:
+            if self._closed:
+                return
+            kept: Deque[_ConnectionEntry] = deque()
+            while self._available:
+                entry = self._available.popleft()
+                if self._should_discard(entry, now):
+                    self._discard_entry_locked(entry)
+                    continue
+                if (
+                    self._config.keepalive
+                    and self._config.keepalive_interval
+                    and now - entry.last_check >= self._config.keepalive_interval
+                ):
+                    to_check.append(entry)
+                else:
+                    kept.append(entry)
+            self._available = kept
+
+        for entry in list(to_check):
+            if self._validate(entry):
+                with self._condition:
+                    if not self._closed:
+                        entry.last_used = time.monotonic()
+                        self._available.append(entry)
+            else:
+                self._discard_entry(entry)
+
+        with self._condition:
+            needed = max(0, self._config.min_size - self._total)
+        for _ in range(needed):
+            entry = self._create_entry()
+            if entry:
+                with self._condition:
+                    if self._closed:
+                        self._close_entry(entry)
+                        return
+                    self._available.append(entry)
+                    self._total += 1
+                    self._condition.notify_all()

+ 20 - 0
prompts/generate_qa_pair.txt

@@ -0,0 +1,20 @@
+你是一名 BI 系统的问答构建助手,负责为每张卡片生成可落地执行的 SQL 及对应的用户问题。
+
+输入提供了仪表盘与卡片定义,包括:
+- 每张卡片的 SELECT/GROUP BY 语句(未带 WHERE/HAVING)
+- F 类型筛选器:用户可调整的业务过滤条件
+- D 类型筛选器:卡片的固化条件,必须始终应用
+
+任务要求:
+1. 仅针对 F 类型筛选器,挑选 1~3 个最有业务意义的组合(可以是 1 个或多个)。
+2. 将所选 F 筛选器与所有 D 类型固化条件一起拼接进 SQL 的 WHERE/HAVING 中,生成可直接执行的 SQL。
+3. 为每个生成的 SQL 写出清晰的用户问题与其带槽位的问题(sample_question 可包含占位符)。
+4. 严格输出 JSON 数组,每个元素字段:
+   - card_id
+   - question
+   - sample_question
+   - filter_ids: 所选的 F 类型筛选器 ID 列表(D 类型无需列出,会自动附加)
+   - sql: 拼接后的完整 SQL 文本
+
+输入内容:
+{content}

+ 21 - 0
prompts/generate_qa_pair_with_user_request.txt

@@ -0,0 +1,21 @@
+你是一名 BI 系统的问答构建助手,负责为每张卡片生成可落地执行的 SQL 及对应的用户问题。
+用户的额外需求:{user_request}
+
+输入提供了仪表盘与卡片定义,包括:
+- 每张卡片的 SELECT/GROUP BY 语句(未带 WHERE/HAVING)
+- F 类型筛选器:用户可调整的业务过滤条件
+- D 类型筛选器:卡片的固化条件,必须始终应用
+
+任务要求:
+1. 仅针对 F 类型筛选器,挑选 1~3 个最有业务意义且满足用户需求的组合。
+2. 将所选 F 筛选器与所有 D 类型固化条件一起拼接进 SQL 的 WHERE/HAVING 中,生成可直接执行的 SQL。
+3. 为每个生成的 SQL 写出清晰的用户问题与其带槽位的问题(sample_question 可包含占位符)。
+4. 严格输出 JSON 数组,每个元素字段:
+   - card_id
+   - question
+   - sample_question
+   - filter_ids: 所选的 F 类型筛选器 ID 列表(D 类型无需列出,会自动附加)
+   - sql: 拼接后的完整 SQL 文本
+
+输入内容:
+{content}

+ 17 - 0
requirements.txt

@@ -0,0 +1,17 @@
+annotated-doc==0.0.4
+annotated-types==0.7.0
+anyio==4.11.0
+duckdb==1.4.2
+fastapi==0.121.2
+httpx==0.27.2
+idna==3.11
+pydantic==2.12.4
+pydantic_core==2.41.5
+PyYAML==6.0.3
+sniffio==1.3.1
+starlette==0.49.3
+typing-inspection==0.4.2
+typing_extensions==4.15.0
+uvicorn==0.30.6
+pandas==2.2.3
+py-opengauss

+ 52 - 0
test_field_similarity.py

@@ -0,0 +1,52 @@
+import unittest
+
+from field_similarity import analyze_fields
+
+
+class TestAnalyzeFields(unittest.TestCase):
+    def test_full_match(self):
+        result = analyze_fields("hello world", "hello@@world")
+        self.assertEqual(result["field_count"], 2)
+        self.assertEqual(result["non_zero_count"], 2)
+        self.assertAlmostEqual(result["avg_ratio"], 1.0)
+        self.assertAlmostEqual(result["max_ratio"], 1.0)
+
+    def test_partial_match(self):
+        result = analyze_fields("abc", "zab")
+        self.assertEqual(result["field_count"], 1)
+        self.assertEqual(result["non_zero_count"], 1)
+        self.assertAlmostEqual(result["avg_ratio"], 2 / 3)
+        self.assertAlmostEqual(result["max_ratio"], 2 / 3)
+
+    def test_zero_match(self):
+        result = analyze_fields("abc", "xyz")
+        self.assertEqual(result["field_count"], 1)
+        self.assertEqual(result["non_zero_count"], 0)
+        self.assertAlmostEqual(result["avg_ratio"], 0.0)
+        self.assertAlmostEqual(result["max_ratio"], 0.0)
+
+    def test_empty_fields_are_skipped(self):
+        result = analyze_fields("abc", "ab@@@@bc")
+        self.assertEqual(result["field_count"], 2)
+        self.assertEqual(result["non_zero_count"], 2)
+        self.assertAlmostEqual(result["avg_ratio"], (1 + 1) / 2)
+        self.assertAlmostEqual(result["max_ratio"], 1.0)
+
+    def test_all_empty_fields(self):
+        result = analyze_fields("abc", "@@")
+        self.assertEqual(result, {})
+
+    def test_empty_field_list_string(self):
+        result = analyze_fields("abc", "")
+        self.assertEqual(result, {})
+
+    def test_empty_question(self):
+        result = analyze_fields("", "a@@bb")
+        self.assertEqual(result["field_count"], 2)
+        self.assertEqual(result["non_zero_count"], 0)
+        self.assertAlmostEqual(result["avg_ratio"], 0.0)
+        self.assertAlmostEqual(result["max_ratio"], 0.0)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 109 - 0
tests/db_test.ipynb

@@ -0,0 +1,109 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 13,
+   "id": "6eaf856e",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import os\n",
+    "os.chdir(\"../\")\n",
+    "from gdb_utils import ConnectionPoolConfig, OpenGaussConnectionPool\n",
+    "import py_opengauss\n",
+    "\n",
+    "DB_HOST = \"127.0.0.1\"\n",
+    "DB_PORT = 5432\n",
+    "DB_USER = \"gaussdb\"\n",
+    "DB_PASSWORD = \"Ysl#1234\"\n",
+    "DB_NAME = \"postgres\"\n",
+    "\n",
+    "\n",
+    "def build_dsn() -> str:\n",
+    "    return f\"opengauss://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}\"\n",
+    "\n",
+    "conn = py_opengauss.open(build_dsn())\n",
+    "conn.execute(\"ROLLBACK\")\n",
+    "\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "id": "opengauss-pool-multithread",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "During workload stats: {'total': 10, 'available': 0, 'in_use': 1, 'closed': False}\n",
+      "After workload stats: {'total': 10, 'available': 10, 'in_use': 0, 'closed': False}\n"
+     ]
+    }
+   ],
+   "source": [
+    "import threading, time, datetime\n",
+    "\n",
+    "pool_config = ConnectionPoolConfig(\n",
+    "    dsn=build_dsn(),\n",
+    "    min_size=1,\n",
+    "    max_size=20,\n",
+    "    test_on_borrow=True,\n",
+    "    test_sql=\"SELECT 1\",\n",
+    ")\n",
+    "pool = OpenGaussConnectionPool(pool_config)\n",
+    "\n",
+    "insert_sql = \"INSERT INTO my_schema.table1 (u_uid, process_id, ins_tm, content) VALUES ($1, $2, $3, $4)\"\n",
+    "delete_sql = \"DELETE FROM my_schema.table1 WHERE u_uid = $1\"\n",
+    "\n",
+    "def worker(thread_id: int):\n",
+    "    # u_uid is int; keep within 32-bit and unique-ish per thread\n",
+    "    uid = thread_id * 100000 + int(time.time() * 1000) % 100000\n",
+    "    with pool.connection() as conn:\n",
+    "        inserter = conn.prepare(insert_sql)\n",
+    "        # deleter = conn.prepare(delete_sql)\n",
+    "        inserter(uid, f\"thr{thread_id:02d}\", datetime.datetime.now(), \"worker insert\")\n",
+    "        # Hold the connection briefly to observe pool utilization\n",
+    "        time.sleep(2)\n",
+    "        # deleter(uid)\n",
+    "\n",
+    "threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]\n",
+    "for t in threads:\n",
+    "    t.start()\n",
+    "\n",
+    "# Check stats while work is in flight\n",
+    "print(\"During workload stats:\", pool.stats())\n",
+    "\n",
+    "for t in threads:\n",
+    "    t.join()\n",
+    "\n",
+    "print(\"After workload stats:\", pool.stats())\n",
+    "\n",
+    "# Close the pool when done\n",
+    "pool.close()\n"
+   ]
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": ".venv (3.13.3)",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.13.3"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}

+ 50 - 0
tests/test_generate_endpoint.py

@@ -0,0 +1,50 @@
+import asyncio
+import unittest
+from pathlib import Path
+
+from app.endpoints.generate_qa_pair import generate_qa_pair
+from app.endpoints.schema import QARequest
+
+
+class GenerateQAPairEndpointTest(unittest.TestCase):
+    def test_get_dashboard_info_operator(self):
+        from app.operators.get_dashboard_info import get_dashboard_info
+
+        info = get_dashboard_info({
+            "dashboard_id": "DB001",
+            "card_ids": ["C0001"],
+            "bbk": "BBK002",
+        })
+
+        self.assertEqual(info.dashboard_id, "DB001")
+        self.assertGreaterEqual(len(info.cards), 1)
+        self.assertEqual(info.cards[0].card_id, "C0001")
+
+    def test_generate_qa_pair_endpoint_returns_data(self):
+        request = QARequest(
+            request_id="req-001",
+            dashboard_id="DB001",
+            card_ids=["C0001", "C0002"],
+            bbk_id="BBK002",
+            user_request="测试问题",
+        )
+
+        response_model = asyncio.run(generate_qa_pair(request))
+
+        self.assertEqual(response_model.returnCode, "SUCCESS")
+        body = response_model.body
+        self.assertEqual(body.dashboard_id, request.dashboard_id)
+        self.assertTrue(set(body.card_ids).issubset(set(request.card_ids)))
+        self.assertGreaterEqual(len(body.qa_pairs), 1)
+        first = body.qa_pairs[0]
+        self.assertEqual(first.dashboard_id, request.dashboard_id)
+        self.assertIn(first.card_id, request.card_ids)
+
+        log_dir = Path("app/log/test_results")
+        log_dir.mkdir(parents=True, exist_ok=True)
+        outfile = log_dir / "generate_qa_pair_response.txt"
+        outfile.write_text(response_model.model_dump_json(ensure_ascii=False, indent=2))
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 37 - 0
tests/test_generate_operator.py

@@ -0,0 +1,37 @@
+import asyncio
+import unittest
+
+from app.operators.get_dashboard_info import get_dashboard_info
+from app.operators.generate_qa_pair import generate_qa_pair
+
+
+class GenerateOperatorTest(unittest.TestCase):
+    def setUp(self):
+        self.input_args = {
+            "dashboard_id": "DB001",
+            "card_ids": ["C0001", "C0002"],
+            "bbk": "BBK002",
+        }
+        self.dashboard_info = get_dashboard_info(self.input_args)
+
+    def test_generate_qa_pair_fallback_output(self):
+        result = asyncio.run(
+            generate_qa_pair({
+                "get_dashboard_info": self.dashboard_info,
+                "dashboard_id": self.input_args["dashboard_id"],
+                "card_ids": self.input_args["card_ids"],
+                "bbk": self.input_args["bbk"],
+                "user_request": "",
+            })
+        )
+
+        self.assertEqual(len(result), len(self.dashboard_info.cards))
+        first = result[0]
+        print(first)
+        self.assertEqual(first.dashboard_id, self.input_args["dashboard_id"])
+        self.assertIn(first.card_id, self.input_args["card_ids"])
+        self.assertTrue(first.question)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 340 - 0
tests/test_opengauss_pool.py

@@ -0,0 +1,340 @@
+import threading
+import time
+import unittest
+from typing import List
+
+try:
+    import py_opengauss  # type: ignore  # noqa: F401
+except ImportError:  # pragma: no cover - handled via skip
+    py_opengauss = None
+
+from gdb_utils import ConnectionPoolConfig, OpenGaussConnectionPool
+
+
+DB_HOST = "127.0.0.1"
+DB_PORT = 5432
+DB_USER = "gaussdb"
+DB_PASSWORD = "Ysl#1234"
+DB_NAME = "postgres"
+
+
+def build_dsn() -> str:
+    return f"opengauss://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
+
+
+@unittest.skipIf(py_opengauss is None, "py_opengauss is required for integration tests")
+class OpenGaussPoolIntegrationTest(unittest.TestCase):
+    seed_uid: int
+    seed_process_id: str
+    config: ConnectionPoolConfig
+    pool: OpenGaussConnectionPool
+
+    @classmethod
+    def setUpClass(cls) -> None:
+        base = int(time.time()) % 1_500_000_000
+        cls.seed_uid = base + 100
+        cls.seed_process_id = f"pool_seed_{base}"
+        conn = py_opengauss.open(build_dsn())
+        try:
+            delete_stmt = conn.prepare("DELETE FROM my_schema.table1 WHERE u_uid = $1")
+            delete_stmt(cls.seed_uid)
+            insert_stmt = conn.prepare(
+                """
+                INSERT INTO my_schema.table1 (u_uid, process_id, ins_tm, content)
+                VALUES ($1, $2, NOW(), $3)
+                """
+            )
+            insert_stmt(cls.seed_uid, cls.seed_process_id, "seed")
+        finally:
+            conn.close()
+
+        cls.config = ConnectionPoolConfig(
+            dsn=build_dsn(),
+            min_size=2,
+            max_size=8,
+            idle_timeout=30,
+            max_lifetime=60,
+            test_on_borrow=True,
+            test_sql="SELECT 1",
+            keepalive=True,
+            keepalive_interval=0.5,
+            health_check_interval=0.5,
+        )
+        cls.pool = OpenGaussConnectionPool(cls.config)
+
+    @classmethod
+    def tearDownClass(cls) -> None:
+        if hasattr(cls, "pool") and cls.pool:
+            cls.pool.close()
+        conn = py_opengauss.open(build_dsn())
+        try:
+            delete_stmt = conn.prepare(
+                "DELETE FROM my_schema.table1 WHERE process_id = $1"
+            )
+            delete_stmt(cls.seed_process_id)
+        finally:
+            conn.close()
+
+    def _wait_for_min_size(self, timeout: float = 2.0) -> None:
+        deadline = time.monotonic() + timeout
+        while time.monotonic() < deadline:
+            stats = self.pool.stats()
+            if stats["total"] >= self.config.min_size:
+                return
+            time.sleep(0.05)
+
+    def test_read_and_write(self):
+        print("STEP: test_read_and_write - setup table and insert row")
+        # u_uid,name are VARCHAR(10); keep identifiers short.
+        unique_uid = f"u{int(time.time()) % 100000:05d}"
+        # Ensure schema/table exist and insert a row.
+        with self.pool.connection() as conn:
+            conn.execute("CREATE SCHEMA IF NOT EXISTS my_schema")
+            conn.execute(
+                """
+                CREATE TABLE IF NOT EXISTS my_schema.test_table (
+                    u_uid VARCHAR(10),
+                    name VARCHAR(10),
+                    CONSTRAINT pk_uid PRIMARY KEY(u_uid)
+                )
+                """
+            )
+            # Ensure no leftover row with same key before insert.
+            delete_before_insert = conn.prepare("DELETE FROM my_schema.test_table WHERE u_uid = $1")
+            delete_before_insert(unique_uid)
+            insert_stmt = conn.prepare(
+                "INSERT INTO my_schema.test_table (u_uid, name) VALUES ($1, $2)"
+            )
+            insert_stmt(unique_uid, "codex")
+
+        print("STEP: test_read_and_write - read row and assert")
+        # Read back the row.
+        with self.pool.connection() as conn:
+            select_stmt = conn.prepare("SELECT name FROM my_schema.test_table WHERE u_uid = $1")
+            rows = select_stmt(unique_uid)
+
+        self.assertIsNotNone(rows)
+        self.assertGreaterEqual(len(rows), 1)
+        self.assertEqual(rows[0][0], "codex")
+
+        print("STEP: test_read_and_write - cleanup inserted row")
+        # Clean up the inserted row.
+        with self.pool.connection() as conn:
+            delete_stmt = conn.prepare("DELETE FROM my_schema.test_table WHERE u_uid = $1")
+            delete_stmt(unique_uid)
+
+    def test_broken_connection_is_replaced(self):
+        print("STEP: test_broken_connection_is_replaced - close and return broken connection")
+        conn = self.pool.borrow()
+        # Simulate an unusable connection by closing it and marking as broken.
+        conn.close()
+        self.pool.return_connection(conn, had_error=True)
+
+        print("STEP: test_broken_connection_is_replaced - borrow replacement and validate")
+        # Borrow again; pool should create a new usable connection.
+        with self.pool.connection() as conn2:
+            stmt = conn2.prepare("SELECT 1")
+            rows = stmt()
+        self.assertEqual(rows[0][0], 1)
+
+        self._wait_for_min_size()
+        stats_after = self.pool.stats()
+        self.assertFalse(stats_after["closed"])
+        self.assertLessEqual(stats_after["total"], self.config.max_size)
+        self.assertGreaterEqual(stats_after["total"], self.config.min_size)
+
+    def test_pool_limit_and_timeout(self):
+        print("STEP: test_pool_limit_and_timeout - exhaust pool")
+        borrowed = [self.pool.borrow() for _ in range(self.config.max_size)]
+        errors: List[Exception] = []
+
+        print("STEP: test_pool_limit_and_timeout - attempt timed borrow")
+        def try_borrow():
+            try:
+                self.pool.borrow(timeout=0.5)
+            except Exception as exc:  # noqa: BLE001 - capture TimeoutError
+                errors.append(exc)
+
+        t = threading.Thread(target=try_borrow)
+        t.start()
+        t.join()
+
+        print("STEP: test_pool_limit_and_timeout - return connections and assert stats")
+        for conn in borrowed:
+            self.pool.return_connection(conn)
+        stats = self.pool.stats()
+
+        self.assertLessEqual(stats["total"], self.config.max_size)
+        self.assertEqual(stats["in_use"], 0)
+        self.assertGreaterEqual(stats["available"], self.config.min_size)
+        self.assertTrue(any(isinstance(err, TimeoutError) for err in errors))
+
+    def test_serial_reads_with_lifecycle(self):
+        print("STEP: test_serial_reads_with_lifecycle - serial reads with pauses")
+        for _ in range(3):
+            with self.pool.connection() as conn:
+                select_stmt = conn.prepare(
+                    "SELECT content FROM my_schema.table1 WHERE u_uid = $1"
+                )
+                rows = select_stmt(self.seed_uid)
+                self.assertEqual(rows[0][0], "seed")
+                time.sleep(0.5)
+
+        print("STEP: test_serial_reads_with_lifecycle - assert pool stats")
+        stats = self.pool.stats()
+
+        self.assertLessEqual(stats["total"], self.config.max_size)
+        self.assertFalse(stats["closed"])
+
+    def test_concurrent_reads(self):
+        print("STEP: test_concurrent_reads - start threads")
+        results: List[str] = []
+        errors: List[Exception] = []
+        lock = threading.Lock()
+
+        def worker():
+            try:
+                with self.pool.connection(timeout=2.0) as conn:
+                    select_stmt = conn.prepare(
+                        "SELECT content FROM my_schema.table1 WHERE u_uid = $1"
+                    )
+                    rows = select_stmt(self.seed_uid)
+                with lock:
+                    results.append(rows[0][0])
+            except Exception as exc:  # noqa: BLE001 - capture any worker errors
+                with lock:
+                    errors.append(exc)
+
+        threads = [threading.Thread(target=worker) for _ in range(20)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        print("STEP: test_concurrent_reads - assert results and pool stats")
+        stats = self.pool.stats()
+
+        self.assertEqual(len(errors), 0)
+        self.assertEqual(len(results), 20)
+        self.assertFalse(stats["closed"])
+        self.assertLessEqual(stats["total"], self.config.max_size)
+
+    def test_concurrent_batch_writes(self):
+        print("STEP: test_concurrent_batch_writes - start writer threads")
+        base = self.seed_uid + 10000
+        process_id = f"pool_batch_{base}"
+        total_batches = 5
+        batch_size = 20
+        worker_count = 8
+        errors: List[Exception] = []
+        lock = threading.Lock()
+
+        def writer(worker_idx: int):
+            try:
+                for batch_idx in range(total_batches):
+                    with self.pool.connection(timeout=2.0) as conn:
+                        insert_stmt = conn.prepare(
+                            """
+                            INSERT INTO my_schema.table1 (u_uid, process_id, ins_tm, content)
+                            VALUES ($1, $2, NOW(), $3)
+                            """
+                        )
+                        for i in range(batch_size):
+                            uid = base + (worker_idx * 1000) + (batch_idx * batch_size) + i
+                            insert_stmt(uid, process_id, f"batch_{batch_idx}")
+            except Exception as exc:  # noqa: BLE001 - capture any worker errors
+                with lock:
+                    errors.append(exc)
+
+        threads = [threading.Thread(target=writer, args=(i,)) for i in range(worker_count)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        print("STEP: test_concurrent_batch_writes - validate count and cleanup")
+        with self.pool.connection(timeout=2.0) as conn:
+            count_stmt = conn.prepare(
+                "SELECT COUNT(*) FROM my_schema.table1 WHERE process_id = $1"
+            )
+            rows = count_stmt(process_id)
+            total_inserted = rows[0][0]
+            delete_stmt = conn.prepare(
+                "DELETE FROM my_schema.table1 WHERE process_id = $1"
+            )
+            delete_stmt(process_id)
+
+        stats = self.pool.stats()
+
+        self.assertEqual(len(errors), 0)
+        self.assertEqual(total_inserted, worker_count * total_batches * batch_size)
+        self.assertFalse(stats["closed"])
+
+    def test_connection_lifecycle_management(self):
+        print("STEP: test_connection_lifecycle_management - borrow and age connection")
+        conn = self.pool.borrow()
+        conn.prepare("SELECT 1")()
+        time.sleep(self.config.max_lifetime + 0.2)
+        self.pool.return_connection(conn)
+
+        print("STEP: test_connection_lifecycle_management - borrow new connection")
+        new_conn = self.pool.borrow(timeout=1.0)
+        self.pool.return_connection(new_conn)
+        stats = self.pool.stats()
+
+        self.assertNotEqual(id(conn), id(new_conn))
+        self.assertLessEqual(stats["total"], self.config.max_size)
+
+    def test_connection_validation_replaces_bad_connection(self):
+        print("STEP: test_connection_validation_replaces_bad_connection - return closed conn")
+        conn = self.pool.borrow()
+        conn.close()
+        self.pool.return_connection(conn, had_error=False)
+
+        print("STEP: test_connection_validation_replaces_bad_connection - borrow and validate")
+        with self.pool.connection(timeout=1.0) as conn2:
+            rows = conn2.prepare("SELECT 1")()
+        stats = self.pool.stats()
+
+        self.assertEqual(rows[0][0], 1)
+        self.assertLessEqual(stats["total"], self.config.max_size)
+
+    def test_long_running_requests(self):
+        print("STEP: test_long_running_requests - run mixed workload")
+        errors: List[Exception] = []
+        lock = threading.Lock()
+        stop_at = time.monotonic() + 3.0
+
+        def load_worker():
+            while time.monotonic() < stop_at:
+                try:
+                    with self.pool.connection(timeout=2.0) as conn:
+                        conn.prepare("SELECT 1")()
+                        time.sleep(0.2)
+                        select_stmt = conn.prepare(
+                            "SELECT content FROM my_schema.table1 WHERE u_uid = $1"
+                        )
+                        rows = select_stmt(self.seed_uid)
+                        if rows[0][0] != "seed":
+                            raise AssertionError("Unexpected content")
+                except Exception as exc:  # noqa: BLE001 - capture any worker errors
+                    with lock:
+                        errors.append(exc)
+                    return
+
+        threads = [threading.Thread(target=load_worker) for _ in range(4)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        print("STEP: test_long_running_requests - assert pool stats")
+        stats = self.pool.stats()
+
+        self.assertEqual(len(errors), 0)
+        self.assertFalse(stats["closed"])
+        self.assertLessEqual(stats["total"], self.config.max_size)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 298 - 0
tests/test_opengauss_pool_hardened.py

@@ -0,0 +1,298 @@
+import threading
+import time
+import unittest
+from typing import List
+
+try:
+    import py_opengauss  # type: ignore  # noqa: F401
+except ImportError:  # pragma: no cover - handled via skip
+    py_opengauss = None
+
+from gdb_utils.opengauss_pool_hardened import (
+    ConnectionPoolConfig,
+    OpenGaussConnectionPool,
+    PoolExhaustedError,
+)
+
+
+DB_HOST = "127.0.0.1"
+DB_PORT = 5432
+DB_USER = "gaussdb"
+DB_PASSWORD = "Ysl#1234"
+DB_NAME = "postgres"
+
+
+def build_dsn() -> str:
+    return f"opengauss://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
+
+
+@unittest.skipIf(py_opengauss is None, "py_opengauss is required for integration tests")
+class OpenGaussHardenedPoolIntegrationTest(unittest.TestCase):
+    seed_uid: int
+    seed_process_id: str
+    config: ConnectionPoolConfig
+    pool: OpenGaussConnectionPool
+
+    @classmethod
+    def setUpClass(cls) -> None:
+        base = int(time.time()) % 1_500_000_000
+        cls.seed_uid = base + 200
+        cls.seed_process_id = f"hardened_seed_{base}"
+        conn = py_opengauss.open(build_dsn())
+        try:
+            delete_stmt = conn.prepare("DELETE FROM my_schema.table1 WHERE u_uid = $1")
+            delete_stmt(cls.seed_uid)
+            insert_stmt = conn.prepare(
+                """
+                INSERT INTO my_schema.table1 (u_uid, process_id, ins_tm, content)
+                VALUES ($1, $2, NOW(), $3)
+                """
+            )
+            insert_stmt(cls.seed_uid, cls.seed_process_id, "seed")
+        finally:
+            conn.close()
+
+        cls.config = ConnectionPoolConfig(
+            dsn=build_dsn(),
+            min_size=2,
+            max_size=6,
+            blocking=True,
+            acquire_timeout=0.5,
+            idle_timeout=10,
+            max_lifetime=30,
+            max_usage=20,
+            test_on_borrow=True,
+            test_sql="SELECT 1",
+            keepalive=True,
+            keepalive_interval=0.5,
+            health_check_interval=0.5,
+            reset_on_return=True,
+        )
+        cls.pool = OpenGaussConnectionPool(cls.config)
+
+    @classmethod
+    def tearDownClass(cls) -> None:
+        if hasattr(cls, "pool") and cls.pool:
+            cls.pool.close()
+        conn = py_opengauss.open(build_dsn())
+        try:
+            delete_stmt = conn.prepare(
+                "DELETE FROM my_schema.table1 WHERE process_id = $1"
+            )
+            delete_stmt(cls.seed_process_id)
+        finally:
+            conn.close()
+
+    def _new_pool(self, **overrides) -> OpenGaussConnectionPool:
+        config_dict = self.config.__dict__.copy()
+        config_dict.update(overrides)
+        return OpenGaussConnectionPool(ConnectionPoolConfig(**config_dict))
+
+    def test_pool_limit_and_timeout(self):
+        print("STEP: test_pool_limit_and_timeout - exhaust pool")
+        borrowed = [self.pool.borrow() for _ in range(self.config.max_size)]
+
+        print("STEP: test_pool_limit_and_timeout - timed borrow raises")
+        with self.assertRaises(PoolExhaustedError):
+            self.pool.borrow(timeout=0.3)
+
+        print("STEP: test_pool_limit_and_timeout - return connections and assert stats")
+        for conn in borrowed:
+            conn.close()
+        stats = self.pool.stats()
+
+        self.assertLessEqual(stats["total"], self.config.max_size)
+        self.assertEqual(stats["in_use"], 0)
+        self.assertGreaterEqual(stats["available"], self.config.min_size)
+
+    def test_serial_reads_with_lifecycle(self):
+        print("STEP: test_serial_reads_with_lifecycle - serial reads with pauses")
+        pool = self._new_pool(min_size=1, max_size=3, max_lifetime=1.0, idle_timeout=5.0)
+        stats = {}
+        seen_ids = set()
+        stats_before = pool.stats()
+        try:
+            for idx in range(4):
+                with pool.connection(timeout=1.0) as conn:
+                    seen_ids.add(id(conn._entry.conn))
+                    select_stmt = conn.prepare(
+                        "SELECT content FROM my_schema.table1 WHERE u_uid = $1"
+                    )
+                    rows = select_stmt(self.seed_uid)
+                    self.assertEqual(rows[0][0], "seed")
+                    if idx == 1:
+                        time.sleep(1.2)
+                    else:
+                        time.sleep(0.2)
+            stats = pool.stats()
+        finally:
+            pool.close()
+
+        print("STEP: test_serial_reads_with_lifecycle - assert pool stats")
+        self.assertLessEqual(stats["total"], 3)
+        self.assertFalse(stats["closed"])
+        self.assertTrue(
+            len(seen_ids) >= 2 or stats["discarded"] > stats_before["discarded"]
+        )
+
+    def test_concurrent_reads(self):
+        print("STEP: test_concurrent_reads - start threads")
+        results: List[str] = []
+        errors: List[Exception] = []
+        lock = threading.Lock()
+
+        def worker():
+            try:
+                with self.pool.connection(timeout=2.0) as conn:
+                    select_stmt = conn.prepare(
+                        "SELECT content FROM my_schema.table1 WHERE u_uid = $1"
+                    )
+                    rows = select_stmt(self.seed_uid)
+                with lock:
+                    results.append(rows[0][0])
+            except Exception as exc:  # noqa: BLE001 - capture any worker errors
+                with lock:
+                    errors.append(exc)
+
+        threads = [threading.Thread(target=worker) for _ in range(20)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        print("STEP: test_concurrent_reads - assert results and pool stats")
+        stats = self.pool.stats()
+
+        self.assertEqual(len(errors), 0)
+        self.assertEqual(len(results), 20)
+        self.assertFalse(stats["closed"])
+        self.assertLessEqual(stats["total"], self.config.max_size)
+
+    def test_concurrent_batch_writes(self):
+        print("STEP: test_concurrent_batch_writes - start writer threads")
+        base = self.seed_uid + 20000
+        process_id = f"hardened_batch_{base}"
+        total_batches = 5
+        batch_size = 20
+        worker_count = 8
+        errors: List[Exception] = []
+        lock = threading.Lock()
+
+        def writer(worker_idx: int):
+            try:
+                for batch_idx in range(total_batches):
+                    with self.pool.connection(timeout=2.0) as conn:
+                        insert_stmt = conn.prepare(
+                            """
+                            INSERT INTO my_schema.table1 (u_uid, process_id, ins_tm, content)
+                            VALUES ($1, $2, NOW(), $3)
+                            """
+                        )
+                        for i in range(batch_size):
+                            uid = base + (worker_idx * 1000) + (batch_idx * batch_size) + i
+                            insert_stmt(uid, process_id, f"batch_{batch_idx}")
+            except Exception as exc:  # noqa: BLE001 - capture any worker errors
+                with lock:
+                    errors.append(exc)
+
+        threads = [threading.Thread(target=writer, args=(i,)) for i in range(worker_count)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        print("STEP: test_concurrent_batch_writes - validate count and cleanup")
+        with self.pool.connection(timeout=2.0) as conn:
+            count_stmt = conn.prepare(
+                "SELECT COUNT(*) FROM my_schema.table1 WHERE process_id = $1"
+            )
+            rows = count_stmt(process_id)
+            total_inserted = rows[0][0]
+            delete_stmt = conn.prepare(
+                "DELETE FROM my_schema.table1 WHERE process_id = $1"
+            )
+            delete_stmt(process_id)
+
+        stats = self.pool.stats()
+
+        self.assertEqual(len(errors), 0)
+        self.assertEqual(total_inserted, worker_count * total_batches * batch_size)
+        self.assertFalse(stats["closed"])
+
+    def test_connection_lifecycle_management(self):
+        print("STEP: test_connection_lifecycle_management - hold and expire connection")
+        pool = self._new_pool(min_size=1, max_size=2, max_lifetime=1.0, idle_timeout=5.0)
+        stats = {}
+        try:
+            conn = pool.borrow()
+            conn_id = id(conn._entry.conn)
+            time.sleep(1.2)
+            conn.close()
+
+            print("STEP: test_connection_lifecycle_management - borrow replacement")
+            new_conn = pool.borrow(timeout=1.0)
+            new_id = id(new_conn._entry.conn)
+            new_conn.close()
+            stats = pool.stats()
+        finally:
+            pool.close()
+
+        self.assertNotEqual(conn_id, new_id)
+        self.assertLessEqual(stats["total"], 2)
+
+    def test_connection_validation_replaces_bad_connection(self):
+        print("STEP: test_connection_validation_replaces_bad_connection - return closed conn")
+        conn = self.pool.borrow()
+        bad_id = id(conn._entry.conn)
+        conn._entry.conn.close()
+        self.pool.return_connection(conn)
+
+        print("STEP: test_connection_validation_replaces_bad_connection - borrow and validate")
+        with self.pool.connection(timeout=1.0) as conn2:
+            new_id = id(conn2._entry.conn)
+            rows = conn2.prepare("SELECT 1")()
+        stats = self.pool.stats()
+
+        self.assertEqual(rows[0][0], 1)
+        self.assertNotEqual(bad_id, new_id)
+        self.assertLessEqual(stats["total"], self.config.max_size)
+
+    def test_long_running_requests(self):
+        print("STEP: test_long_running_requests - run mixed workload")
+        errors: List[Exception] = []
+        lock = threading.Lock()
+        stop_at = time.monotonic() + 3.0
+
+        def load_worker():
+            while time.monotonic() < stop_at:
+                try:
+                    with self.pool.connection(timeout=2.0) as conn:
+                        conn.prepare("SELECT 1")()
+                        time.sleep(0.2)
+                        select_stmt = conn.prepare(
+                            "SELECT content FROM my_schema.table1 WHERE u_uid = $1"
+                        )
+                        rows = select_stmt(self.seed_uid)
+                        if rows[0][0] != "seed":
+                            raise AssertionError("Unexpected content")
+                except Exception as exc:  # noqa: BLE001 - capture any worker errors
+                    with lock:
+                        errors.append(exc)
+                    return
+
+        threads = [threading.Thread(target=load_worker) for _ in range(4)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        print("STEP: test_long_running_requests - assert pool stats")
+        stats = self.pool.stats()
+
+        self.assertEqual(len(errors), 0)
+        self.assertFalse(stats["closed"])
+        self.assertLessEqual(stats["total"], self.config.max_size)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 28 - 0
tests/test_workflow.py

@@ -0,0 +1,28 @@
+import asyncio
+import unittest
+
+from app.pipeline.workflow import Workflow
+
+
+class WorkflowIntegrationTest(unittest.TestCase):
+    def test_workflow_executes_end_to_end(self):
+        wf = Workflow()
+        result = asyncio.run(
+            wf.execute_workflow(
+                {
+                    "dashboard_id": "DB001",
+                    "card_ids": ["C0001", "C0002"],
+                    "bbk": "BBK002",
+                    "user_request": "",
+                }
+            )
+        )
+
+        self.assertGreaterEqual(len(result), 1)
+        first = result[0]
+        self.assertEqual(first.dashboard_id, "DB001")
+        self.assertIn(first.card_id, ["C0001", "C0002"])
+
+
+if __name__ == "__main__":
+    unittest.main()