| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- """Utility layer for querying BI metadata stored inside DuckDB."""
- from __future__ import annotations
- from pathlib import Path
- from threading import Lock
- from typing import Any, Dict, Iterable, List, Optional, Sequence
- import duckdb
- from app.const import DATA_DIR
- class DuckDBClient:
- """Thread-safe helper around ``duckdb.DuckDBPyConnection``."""
- def __init__(self, db_path: Optional[Path] = None) -> None:
- default_path = DATA_DIR / "duckdb" / "bi_metadata.duckdb"
- path = Path(db_path or default_path)
- path.parent.mkdir(parents=True, exist_ok=True)
- self._conn = duckdb.connect(str(path))
- self._lock = Lock()
- def _query(self, sql: str, params: Sequence[Any] | None = None) -> List[Dict[str, Any]]:
- params = params or []
- with self._lock:
- cursor = self._conn.execute(sql, params)
- rows = cursor.fetchall()
- columns = [desc[0] for desc in cursor.description]
- return [dict(zip(columns, row)) for row in rows]
- def fetch_dashboard_cards(
- self,
- dashboard_id: str,
- card_ids: Optional[Iterable[str]] = None,
- bbk_id: Optional[str] = None,
- ) -> List[Dict[str, Any]]:
- """Return card metadata for the requested dashboard and optional card list."""
- sql = [
- "SELECT card_id, dashboard_id, dataset_id, dashboard_name, card_name, dataset_name, bbk_id",
- "FROM card_dataset_dashboard_mapping",
- "WHERE dashboard_id = ?",
- ]
- params: List[Any] = [dashboard_id]
- if bbk_id:
- sql.append("AND bbk_id = ?")
- params.append(bbk_id)
- if card_ids:
- placeholders = ",".join(["?"] * len(list(card_ids)))
- sql.append(f"AND card_id IN ({placeholders})")
- params.extend(list(card_ids))
- statement = " ".join(sql)
- return self._query(statement, params)
- def fetch_card_definition(self, card_ids: Iterable[str]) -> List[Dict[str, Any]]:
- """Retrieve SELECT/WHERE/GROUP BY definitions for the provided cards."""
- card_ids = list(card_ids)
- if not card_ids:
- return []
- placeholders = ",".join(["?"] * len(card_ids))
- statement = (
- "SELECT card_id, card_name, card_desc, sql_select, sql_where, sql_groupby "
- "FROM card_info WHERE card_id IN (" + placeholders + ")"
- )
- return self._query(statement, list(card_ids))
- def fetch_card_filters(self, card_ids: Iterable[str]) -> List[Dict[str, Any]]:
- card_ids = list(card_ids)
- if not card_ids:
- return []
- placeholders = ",".join(["?"] * len(card_ids))
- statement = (
- "SELECT card_id, filter_id, filter_type, where_clause, default_value, options "
- "FROM card_filter_info WHERE card_id IN (" + placeholders + ")"
- )
- return self._query(statement, list(card_ids))
- def fetch_dashboard_info(self, dashboard_id: str) -> Optional[Dict[str, Any]]:
- rows = self._query(
- "SELECT dashboard_id, dashboard_name, dashboard_desc, folder_path FROM dashboard_info WHERE dashboard_id = ?",
- [dashboard_id],
- )
- return rows[0] if rows else None
- def fetch_dataset_ddl(self, dataset_ids: Iterable[str]) -> List[Dict[str, Any]]:
- dataset_ids = list(dataset_ids)
- if not dataset_ids:
- return []
- placeholders = ",".join(["?"] * len(dataset_ids))
- statement = (
- "SELECT dataset_id, dataset_ddl FROM dataset_ddl WHERE dataset_id IN ("
- + placeholders
- + ")"
- )
- return self._query(statement, list(dataset_ids))
- __all__ = ["DuckDBClient"]
|