from __future__ import annotations import argparse import hashlib import json import random from pathlib import Path import pandas as pd from card_blueprints import DOMAIN_KEYWORDS, LEVEL1_DOMAIN_MAP, get_card_blueprints ROOT = Path(__file__).resolve().parents[1] DASHBOARD_PATH = ROOT / "data" / "dashboard.parquet" OUTPUT_PATH = ROOT / "data" / "card_info.parquet" RANDOM_SEED = 20260507 CARD_COLUMNS = [ "card_id", "bbk_id", "bbk_name", "card_name", "ds_id", "ds_name", "dash_id", "dash_name", "card_type_cd", "field_alias", "field_name", "filters_field_name", "filters_field_value", "sort_field_name", "sort_way", "num_value_field_name", "num_value_field_alias", "num_value_field_merge_way", "folder_name", "folder_all_route", ] def to_json(values: list[str]) -> str: return json.dumps(values, ensure_ascii=False) def make_id(prefix: str, ordinal: int) -> str: raw = f"{prefix}-{ordinal}-{RANDOM_SEED}".encode("utf-8") return hashlib.md5(raw).hexdigest()[:24] def parse_branch_names(raw_branch_names: list[str] | None, available_branch_names: set[str]) -> list[str] | None: if not raw_branch_names: return None branch_names: list[str] = [] for raw_value in raw_branch_names: for branch_name in raw_value.split(","): branch_name = branch_name.strip() if branch_name and branch_name not in branch_names: branch_names.append(branch_name) unknown_branch_names = [name for name in branch_names if name not in available_branch_names] if unknown_branch_names: supported = "、".join(sorted(available_branch_names)) unknown = "、".join(unknown_branch_names) raise ValueError(f"unsupported branch name: {unknown}. Available branch names in dashboards: {supported}") return branch_names def infer_domain(dashboard: pd.Series) -> str: level1_name = str(dashboard.get("level1_dept_name", "")) if level1_name in LEVEL1_DOMAIN_MAP: return LEVEL1_DOMAIN_MAP[level1_name] dash_name = str(dashboard.get("dash_dsply_name", "")) folder_route = str(dashboard.get("folder_all_route", "")) text = f"{level1_name} {dash_name} {folder_route}" for domain, keywords in DOMAIN_KEYWORDS.items(): if any(keyword in text for keyword in keywords): return domain return "零售" def make_dataset_id(dataset_prefix: str, dataset_name: str) -> str: return make_id(f"dataset-{dataset_prefix}-{dataset_name}", 1) def resolve_filter_value(filter_spec: dict[str, str], dashboard: pd.Series) -> str: if "value" in filter_spec: return str(filter_spec["value"]) value_from = filter_spec.get("value_from") if value_from == "dashboard.bbk_id": return str(dashboard["bbk_id"]) if value_from == "dashboard.bbk_name": return str(dashboard["bbk_name"]) value_kind = filter_spec.get("value_kind") field_name = str(filter_spec["field"]) if value_kind == "month_end": return "2026-05" if "月份" in field_name else "2026-05-31" raise ValueError(f"unsupported filter spec: {filter_spec}") def build_card_record( global_ordinal: int, dashboard: pd.Series, spec: dict[str, object], ) -> dict[str, str]: dataset_prefix = str(spec["dataset_prefix"]) dataset_name = str(spec["dataset"]) fields = list(spec["fields"]) metrics = list(spec["metrics"]) filters = list(spec["filters"]) sort = list(spec["sort"]) filter_names = [str(item["field"]) for item in filters] filter_values = [resolve_filter_value(item, dashboard) for item in filters] metric_names = [str(item["field"]) for item in metrics] metric_aliases = [str(item["alias"]) for item in metrics] metric_aggs = [str(item["agg"]) for item in metrics] sort_names = [str(item["field"]) for item in sort] sort_ways = [str(item["way"]) for item in sort] folder_parts = str(dashboard["folder_all_route"]).split("/") folder_name = folder_parts[-1] if folder_parts else infer_domain(dashboard) return { "card_id": make_id("card", global_ordinal), "bbk_id": str(dashboard["bbk_id"]), "bbk_name": str(dashboard["bbk_name"]), "card_name": str(spec["card_name"]), "ds_id": make_dataset_id(dataset_prefix, dataset_name), "ds_name": f"{dataset_prefix}_{str(dashboard['bbk_id'])}_{dataset_name}", "dash_id": str(dashboard["dash_id"]), "dash_name": str(dashboard["dash_dsply_name"]), "card_type_cd": "图表", "field_alias": to_json(fields), "field_name": to_json(fields), "filters_field_name": to_json(filter_names), "filters_field_value": to_json(filter_values), "sort_field_name": to_json(sort_names), "sort_way": to_json(sort_ways), "num_value_field_name": to_json(metric_names), "num_value_field_alias": to_json(metric_aliases), "num_value_field_merge_way": to_json(metric_aggs), "folder_name": folder_name, "folder_all_route": str(dashboard["folder_all_route"]), } def select_dashboards(dashboards: pd.DataFrame, branch_names: list[str] | None, count: int | None) -> pd.DataFrame: selected = dashboards if branch_names: selected = selected[selected["bbk_name"].isin(branch_names)] if count is not None: selected = selected.head(count) return selected.reset_index(drop=True) def choose_card_specs(rng: random.Random, domain: str) -> list[dict[str, object]]: specs = get_card_blueprints(domain) card_count = rng.randint(10, min(20, len(specs))) return rng.sample(specs, card_count) def make_cards( dashboards: pd.DataFrame, start_ordinal: int = 1, ) -> pd.DataFrame: rng = random.Random(RANDOM_SEED + start_ordinal - 1) records: list[dict[str, str]] = [] global_ordinal = start_ordinal for _, dashboard in dashboards.iterrows(): domain = infer_domain(dashboard) for spec in choose_card_specs(rng, domain): records.append(build_card_record(global_ordinal, dashboard, spec)) global_ordinal += 1 cards = pd.DataFrame(records, columns=CARD_COLUMNS) validate_generated_cards(cards) return cards def validate_generated_cards(cards: pd.DataFrame) -> None: for row in cards.itertuples(index=False): metric_names = json.loads(row.num_value_field_name) metric_aliases = json.loads(row.num_value_field_alias) metric_aggs = json.loads(row.num_value_field_merge_way) filter_names = json.loads(row.filters_field_name) filter_values = json.loads(row.filters_field_value) sort_names = json.loads(row.sort_field_name) sort_ways = json.loads(row.sort_way) if not (len(metric_names) == len(metric_aliases) == len(metric_aggs)): raise ValueError(f"metric length mismatch in card {row.card_id}") if len(filter_names) != len(filter_values): raise ValueError(f"filter length mismatch in card {row.card_id}") if len(sort_names) != len(sort_ways): raise ValueError(f"sort length mismatch in card {row.card_id}") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Generate mock BI card parquet data.") parser.add_argument( "--mode", choices=["overwrite", "append"], default="overwrite", help="overwrite replaces the parquet file; append inserts generated rows into the existing parquet file.", ) parser.add_argument( "--count", type=int, default=None, help="number of dashboards to generate cards for. Defaults to all selected dashboards.", ) parser.add_argument( "--output", type=Path, default=OUTPUT_PATH, help="target parquet path.", ) parser.add_argument( "--branch-name", action="append", help=( "branch names to generate, separated by comma or passed repeatedly. " "Defaults to all branches in dashboard parquet." ), ) parser.add_argument( "--dashboard-input", type=Path, default=DASHBOARD_PATH, help="source dashboard parquet path.", ) return parser.parse_args() def main() -> None: args = parse_args() if args.count is not None and args.count <= 0: raise ValueError("--count must be a positive integer") dashboard_path = args.dashboard_input.resolve() if not dashboard_path.exists(): raise FileNotFoundError(f"dashboard parquet not found: {dashboard_path}") dashboards = pd.read_parquet(dashboard_path) branch_names = parse_branch_names(args.branch_name, set(dashboards["bbk_name"])) selected_dashboards = select_dashboards(dashboards, branch_names, args.count) if selected_dashboards.empty: raise ValueError("no dashboards selected for card generation") output_path = args.output.resolve() output_path.parent.mkdir(parents=True, exist_ok=True) existing = pd.DataFrame() if args.mode == "append" and output_path.exists(): existing = pd.read_parquet(output_path) new_cards = make_cards(selected_dashboards, start_ordinal=len(existing) + 1) cards = pd.concat([existing, new_cards], ignore_index=True) if not existing.empty else new_cards validate_generated_cards(cards) cards.to_parquet(output_path, index=False) duplicate_card_id = int(cards["card_id"].duplicated().sum()) card_count_by_dash = new_cards.groupby("dash_id")["card_id"].count() selected_branch_names = branch_names or sorted(selected_dashboards["bbk_name"].unique()) print(f"mode: {args.mode}") print(f"dashboard input: {dashboard_path}") print(f"branch names: {', '.join(selected_branch_names)}") print(f"selected dashboards: {len(selected_dashboards)}") print(f"generated rows: {len(new_cards)}") print(f"wrote total rows: {len(cards)} to {output_path}") print(f"duplicate card_id: {duplicate_card_id}") print(f"cards per dashboard: {int(card_count_by_dash.min())} {int(card_count_by_dash.max())}") print("bbk_name:", new_cards["bbk_name"].value_counts().to_dict()) if __name__ == "__main__": main()