| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- from __future__ import annotations
- import argparse
- import hashlib
- import json
- import random
- from pathlib import Path
- import pandas as pd
- from card_blueprints import DOMAIN_KEYWORDS, LEVEL1_DOMAIN_MAP, get_card_blueprints
- ROOT = Path(__file__).resolve().parents[1]
- DASHBOARD_PATH = ROOT / "data" / "dashboard.parquet"
- OUTPUT_PATH = ROOT / "data" / "card_info.parquet"
- RANDOM_SEED = 20260507
- CARD_COLUMNS = [
- "card_id",
- "bbk_id",
- "bbk_name",
- "card_name",
- "ds_id",
- "ds_name",
- "dash_id",
- "dash_name",
- "card_type_cd",
- "field_alias",
- "field_name",
- "filters_field_name",
- "filters_field_value",
- "sort_field_name",
- "sort_way",
- "num_value_field_name",
- "num_value_field_alias",
- "num_value_field_merge_way",
- "folder_name",
- "folder_all_route",
- ]
- def to_json(values: list[str]) -> str:
- return json.dumps(values, ensure_ascii=False)
- def make_id(prefix: str, ordinal: int) -> str:
- raw = f"{prefix}-{ordinal}-{RANDOM_SEED}".encode("utf-8")
- return hashlib.md5(raw).hexdigest()[:24]
- def parse_branch_names(raw_branch_names: list[str] | None, available_branch_names: set[str]) -> list[str] | None:
- if not raw_branch_names:
- return None
- branch_names: list[str] = []
- for raw_value in raw_branch_names:
- for branch_name in raw_value.split(","):
- branch_name = branch_name.strip()
- if branch_name and branch_name not in branch_names:
- branch_names.append(branch_name)
- unknown_branch_names = [name for name in branch_names if name not in available_branch_names]
- if unknown_branch_names:
- supported = "、".join(sorted(available_branch_names))
- unknown = "、".join(unknown_branch_names)
- raise ValueError(f"unsupported branch name: {unknown}. Available branch names in dashboards: {supported}")
- return branch_names
- def infer_domain(dashboard: pd.Series) -> str:
- level1_name = str(dashboard.get("level1_dept_name", ""))
- if level1_name in LEVEL1_DOMAIN_MAP:
- return LEVEL1_DOMAIN_MAP[level1_name]
- dash_name = str(dashboard.get("dash_dsply_name", ""))
- folder_route = str(dashboard.get("folder_all_route", ""))
- text = f"{level1_name} {dash_name} {folder_route}"
- for domain, keywords in DOMAIN_KEYWORDS.items():
- if any(keyword in text for keyword in keywords):
- return domain
- return "零售"
- def make_dataset_id(dataset_prefix: str, dataset_name: str) -> str:
- return make_id(f"dataset-{dataset_prefix}-{dataset_name}", 1)
- def resolve_filter_value(filter_spec: dict[str, str], dashboard: pd.Series) -> str:
- if "value" in filter_spec:
- return str(filter_spec["value"])
- value_from = filter_spec.get("value_from")
- if value_from == "dashboard.bbk_id":
- return str(dashboard["bbk_id"])
- if value_from == "dashboard.bbk_name":
- return str(dashboard["bbk_name"])
- value_kind = filter_spec.get("value_kind")
- field_name = str(filter_spec["field"])
- if value_kind == "month_end":
- return "2026-05" if "月份" in field_name else "2026-05-31"
- raise ValueError(f"unsupported filter spec: {filter_spec}")
- def build_card_record(
- global_ordinal: int,
- dashboard: pd.Series,
- spec: dict[str, object],
- ) -> dict[str, str]:
- dataset_prefix = str(spec["dataset_prefix"])
- dataset_name = str(spec["dataset"])
- fields = list(spec["fields"])
- metrics = list(spec["metrics"])
- filters = list(spec["filters"])
- sort = list(spec["sort"])
- filter_names = [str(item["field"]) for item in filters]
- filter_values = [resolve_filter_value(item, dashboard) for item in filters]
- metric_names = [str(item["field"]) for item in metrics]
- metric_aliases = [str(item["alias"]) for item in metrics]
- metric_aggs = [str(item["agg"]) for item in metrics]
- sort_names = [str(item["field"]) for item in sort]
- sort_ways = [str(item["way"]) for item in sort]
- folder_parts = str(dashboard["folder_all_route"]).split("/")
- folder_name = folder_parts[-1] if folder_parts else infer_domain(dashboard)
- return {
- "card_id": make_id("card", global_ordinal),
- "bbk_id": str(dashboard["bbk_id"]),
- "bbk_name": str(dashboard["bbk_name"]),
- "card_name": str(spec["card_name"]),
- "ds_id": make_dataset_id(dataset_prefix, dataset_name),
- "ds_name": f"{dataset_prefix}_{str(dashboard['bbk_id'])}_{dataset_name}",
- "dash_id": str(dashboard["dash_id"]),
- "dash_name": str(dashboard["dash_dsply_name"]),
- "card_type_cd": "图表",
- "field_alias": to_json(fields),
- "field_name": to_json(fields),
- "filters_field_name": to_json(filter_names),
- "filters_field_value": to_json(filter_values),
- "sort_field_name": to_json(sort_names),
- "sort_way": to_json(sort_ways),
- "num_value_field_name": to_json(metric_names),
- "num_value_field_alias": to_json(metric_aliases),
- "num_value_field_merge_way": to_json(metric_aggs),
- "folder_name": folder_name,
- "folder_all_route": str(dashboard["folder_all_route"]),
- }
- def select_dashboards(dashboards: pd.DataFrame, branch_names: list[str] | None, count: int | None) -> pd.DataFrame:
- selected = dashboards
- if branch_names:
- selected = selected[selected["bbk_name"].isin(branch_names)]
- if count is not None:
- selected = selected.head(count)
- return selected.reset_index(drop=True)
- def choose_card_specs(rng: random.Random, domain: str) -> list[dict[str, object]]:
- specs = get_card_blueprints(domain)
- card_count = rng.randint(10, min(20, len(specs)))
- return rng.sample(specs, card_count)
- def make_cards(
- dashboards: pd.DataFrame,
- start_ordinal: int = 1,
- ) -> pd.DataFrame:
- rng = random.Random(RANDOM_SEED + start_ordinal - 1)
- records: list[dict[str, str]] = []
- global_ordinal = start_ordinal
- for _, dashboard in dashboards.iterrows():
- domain = infer_domain(dashboard)
- for spec in choose_card_specs(rng, domain):
- records.append(build_card_record(global_ordinal, dashboard, spec))
- global_ordinal += 1
- cards = pd.DataFrame(records, columns=CARD_COLUMNS)
- validate_generated_cards(cards)
- return cards
- def validate_generated_cards(cards: pd.DataFrame) -> None:
- for row in cards.itertuples(index=False):
- metric_names = json.loads(row.num_value_field_name)
- metric_aliases = json.loads(row.num_value_field_alias)
- metric_aggs = json.loads(row.num_value_field_merge_way)
- filter_names = json.loads(row.filters_field_name)
- filter_values = json.loads(row.filters_field_value)
- sort_names = json.loads(row.sort_field_name)
- sort_ways = json.loads(row.sort_way)
- if not (len(metric_names) == len(metric_aliases) == len(metric_aggs)):
- raise ValueError(f"metric length mismatch in card {row.card_id}")
- if len(filter_names) != len(filter_values):
- raise ValueError(f"filter length mismatch in card {row.card_id}")
- if len(sort_names) != len(sort_ways):
- raise ValueError(f"sort length mismatch in card {row.card_id}")
- def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Generate mock BI card parquet data.")
- parser.add_argument(
- "--mode",
- choices=["overwrite", "append"],
- default="overwrite",
- help="overwrite replaces the parquet file; append inserts generated rows into the existing parquet file.",
- )
- parser.add_argument(
- "--count",
- type=int,
- default=None,
- help="number of dashboards to generate cards for. Defaults to all selected dashboards.",
- )
- parser.add_argument(
- "--output",
- type=Path,
- default=OUTPUT_PATH,
- help="target parquet path.",
- )
- parser.add_argument(
- "--branch-name",
- action="append",
- help=(
- "branch names to generate, separated by comma or passed repeatedly. "
- "Defaults to all branches in dashboard parquet."
- ),
- )
- parser.add_argument(
- "--dashboard-input",
- type=Path,
- default=DASHBOARD_PATH,
- help="source dashboard parquet path.",
- )
- return parser.parse_args()
- def main() -> None:
- args = parse_args()
- if args.count is not None and args.count <= 0:
- raise ValueError("--count must be a positive integer")
- dashboard_path = args.dashboard_input.resolve()
- if not dashboard_path.exists():
- raise FileNotFoundError(f"dashboard parquet not found: {dashboard_path}")
- dashboards = pd.read_parquet(dashboard_path)
- branch_names = parse_branch_names(args.branch_name, set(dashboards["bbk_name"]))
- selected_dashboards = select_dashboards(dashboards, branch_names, args.count)
- if selected_dashboards.empty:
- raise ValueError("no dashboards selected for card generation")
- output_path = args.output.resolve()
- output_path.parent.mkdir(parents=True, exist_ok=True)
- existing = pd.DataFrame()
- if args.mode == "append" and output_path.exists():
- existing = pd.read_parquet(output_path)
- new_cards = make_cards(selected_dashboards, start_ordinal=len(existing) + 1)
- cards = pd.concat([existing, new_cards], ignore_index=True) if not existing.empty else new_cards
- validate_generated_cards(cards)
- cards.to_parquet(output_path, index=False)
- duplicate_card_id = int(cards["card_id"].duplicated().sum())
- card_count_by_dash = new_cards.groupby("dash_id")["card_id"].count()
- selected_branch_names = branch_names or sorted(selected_dashboards["bbk_name"].unique())
- print(f"mode: {args.mode}")
- print(f"dashboard input: {dashboard_path}")
- print(f"branch names: {', '.join(selected_branch_names)}")
- print(f"selected dashboards: {len(selected_dashboards)}")
- print(f"generated rows: {len(new_cards)}")
- print(f"wrote total rows: {len(cards)} to {output_path}")
- print(f"duplicate card_id: {duplicate_card_id}")
- print(f"cards per dashboard: {int(card_count_by_dash.min())} {int(card_count_by_dash.max())}")
- print("bbk_name:", new_cards["bbk_name"].value_counts().to_dict())
- if __name__ == "__main__":
- main()
|