duckdb_client.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """Utility layer for querying BI metadata stored inside DuckDB."""
  2. from __future__ import annotations
  3. from pathlib import Path
  4. from threading import Lock
  5. from typing import Any, Dict, Iterable, List, Optional, Sequence
  6. import duckdb
  7. from app.const import DATA_DIR
  8. class DuckDBClient:
  9. """Thread-safe helper around ``duckdb.DuckDBPyConnection``."""
  10. def __init__(self, db_path: Optional[Path] = None) -> None:
  11. default_path = DATA_DIR / "duckdb" / "bi_metadata.duckdb"
  12. path = Path(db_path or default_path)
  13. path.parent.mkdir(parents=True, exist_ok=True)
  14. self._conn = duckdb.connect(str(path))
  15. self._lock = Lock()
  16. def _query(self, sql: str, params: Sequence[Any] | None = None) -> List[Dict[str, Any]]:
  17. params = params or []
  18. with self._lock:
  19. cursor = self._conn.execute(sql, params)
  20. rows = cursor.fetchall()
  21. columns = [desc[0] for desc in cursor.description]
  22. return [dict(zip(columns, row)) for row in rows]
  23. def fetch_dashboard_cards(
  24. self,
  25. dashboard_id: str,
  26. card_ids: Optional[Iterable[str]] = None,
  27. bbk_id: Optional[str] = None,
  28. ) -> List[Dict[str, Any]]:
  29. """Return card metadata for the requested dashboard and optional card list."""
  30. sql = [
  31. "SELECT card_id, dashboard_id, dataset_id, dashboard_name, card_name, dataset_name, bbk_id",
  32. "FROM card_dataset_dashboard_mapping",
  33. "WHERE dashboard_id = ?",
  34. ]
  35. params: List[Any] = [dashboard_id]
  36. if bbk_id:
  37. sql.append("AND bbk_id = ?")
  38. params.append(bbk_id)
  39. if card_ids:
  40. placeholders = ",".join(["?"] * len(list(card_ids)))
  41. sql.append(f"AND card_id IN ({placeholders})")
  42. params.extend(list(card_ids))
  43. statement = " ".join(sql)
  44. return self._query(statement, params)
  45. def fetch_card_definition(self, card_ids: Iterable[str]) -> List[Dict[str, Any]]:
  46. """Retrieve SELECT/WHERE/GROUP BY definitions for the provided cards."""
  47. card_ids = list(card_ids)
  48. if not card_ids:
  49. return []
  50. placeholders = ",".join(["?"] * len(card_ids))
  51. statement = (
  52. "SELECT card_id, card_name, card_desc, sql_select, sql_where, sql_groupby "
  53. "FROM card_info WHERE card_id IN (" + placeholders + ")"
  54. )
  55. return self._query(statement, list(card_ids))
  56. def fetch_card_filters(self, card_ids: Iterable[str]) -> List[Dict[str, Any]]:
  57. card_ids = list(card_ids)
  58. if not card_ids:
  59. return []
  60. placeholders = ",".join(["?"] * len(card_ids))
  61. statement = (
  62. "SELECT card_id, filter_id, filter_type, where_clause, default_value, options "
  63. "FROM card_filter_info WHERE card_id IN (" + placeholders + ")"
  64. )
  65. return self._query(statement, list(card_ids))
  66. def fetch_dashboard_info(self, dashboard_id: str) -> Optional[Dict[str, Any]]:
  67. rows = self._query(
  68. "SELECT dashboard_id, dashboard_name, dashboard_desc, folder_path FROM dashboard_info WHERE dashboard_id = ?",
  69. [dashboard_id],
  70. )
  71. return rows[0] if rows else None
  72. def fetch_dataset_ddl(self, dataset_ids: Iterable[str]) -> List[Dict[str, Any]]:
  73. dataset_ids = list(dataset_ids)
  74. if not dataset_ids:
  75. return []
  76. placeholders = ",".join(["?"] * len(dataset_ids))
  77. statement = (
  78. "SELECT dataset_id, dataset_ddl FROM dataset_ddl WHERE dataset_id IN ("
  79. + placeholders
  80. + ")"
  81. )
  82. return self._query(statement, list(dataset_ids))
  83. __all__ = ["DuckDBClient"]