"""Utility layer for querying BI metadata stored inside DuckDB.""" from __future__ import annotations from pathlib import Path from threading import Lock from typing import Any, Dict, Iterable, List, Optional, Sequence import duckdb from app.const import DATA_DIR class DuckDBClient: """Thread-safe helper around ``duckdb.DuckDBPyConnection``.""" def __init__(self, db_path: Optional[Path] = None) -> None: default_path = DATA_DIR / "duckdb" / "bi_metadata.duckdb" path = Path(db_path or default_path) path.parent.mkdir(parents=True, exist_ok=True) self._conn = duckdb.connect(str(path)) self._lock = Lock() def _query(self, sql: str, params: Sequence[Any] | None = None) -> List[Dict[str, Any]]: params = params or [] with self._lock: cursor = self._conn.execute(sql, params) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in rows] def fetch_dashboard_cards( self, dashboard_id: str, card_ids: Optional[Iterable[str]] = None, bbk_id: Optional[str] = None, ) -> List[Dict[str, Any]]: """Return card metadata for the requested dashboard and optional card list.""" sql = [ "SELECT card_id, dashboard_id, dataset_id, dashboard_name, card_name, dataset_name, bbk_id", "FROM card_dataset_dashboard_mapping", "WHERE dashboard_id = ?", ] params: List[Any] = [dashboard_id] if bbk_id: sql.append("AND bbk_id = ?") params.append(bbk_id) if card_ids: placeholders = ",".join(["?"] * len(list(card_ids))) sql.append(f"AND card_id IN ({placeholders})") params.extend(list(card_ids)) statement = " ".join(sql) return self._query(statement, params) def fetch_card_definition(self, card_ids: Iterable[str]) -> List[Dict[str, Any]]: """Retrieve SELECT/WHERE/GROUP BY definitions for the provided cards.""" card_ids = list(card_ids) if not card_ids: return [] placeholders = ",".join(["?"] * len(card_ids)) statement = ( "SELECT card_id, card_name, card_desc, sql_select, sql_where, sql_groupby " "FROM card_info WHERE card_id IN (" + placeholders + ")" ) return self._query(statement, list(card_ids)) def fetch_card_filters(self, card_ids: Iterable[str]) -> List[Dict[str, Any]]: card_ids = list(card_ids) if not card_ids: return [] placeholders = ",".join(["?"] * len(card_ids)) statement = ( "SELECT card_id, filter_id, filter_type, where_clause, default_value, options " "FROM card_filter_info WHERE card_id IN (" + placeholders + ")" ) return self._query(statement, list(card_ids)) def fetch_dashboard_info(self, dashboard_id: str) -> Optional[Dict[str, Any]]: rows = self._query( "SELECT dashboard_id, dashboard_name, dashboard_desc, folder_path FROM dashboard_info WHERE dashboard_id = ?", [dashboard_id], ) return rows[0] if rows else None def fetch_dataset_ddl(self, dataset_ids: Iterable[str]) -> List[Dict[str, Any]]: dataset_ids = list(dataset_ids) if not dataset_ids: return [] placeholders = ",".join(["?"] * len(dataset_ids)) statement = ( "SELECT dataset_id, dataset_ddl FROM dataset_ddl WHERE dataset_id IN (" + placeholders + ")" ) return self._query(statement, list(dataset_ids)) __all__ = ["DuckDBClient"]