from __future__ import annotations import argparse import hashlib import random from datetime import datetime, timedelta from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] OUTPUT_PATH = ROOT / "data" / "dashboard.parquet" RANDOM_SEED = 20260506 BRANCH_ID_MAP = { "总行": "00100", "北京分行": "00110", "广东分行": "00120", } BRANCH_DM_MAP = { "总行": "zh", "北京分行": "bjfh", "广东分行": "gdfh", } DEFAULT_BRANCH_NAMES = ["总行", "北京分行", "广东分行"] DOMAIN_CONFIGS = [ { "domain": "零售", "level1": "零售金融部", "level2": "经营分析室", "level3": "客户经营团队", "topics": [ "零售客户经营总览", "高价值客户增长分析", "私人银行客户资产监测", "私钻客户AUM提升分析", "财富客户资产配置看板", "养老金客户经营跟踪", "代发客群转化跟踪", "零售存款日均分析", "储蓄存款客群结构分析", "零售贷款投放监控", "个人住房贷款质量看板", "消费贷获客转化分析", "汽车分期贷款经营分析", "理财产品销售分析", "基金产品持仓透视", "保险产品销售监测", "客户资产提升作战图", "网点零售业绩排名", "MGM获客转化分析", ], "folder": "零售数据门户", }, { "domain": "对公", "level1": "公司金融部", "level2": "公司客户经营室", "level3": "对公经营分析团队", "topics": [ "对公客户经营总览", "战略客户贡献分析", "机构客户综合贡献分析", "上市公司客群经营看板", "对公存款增长看板", "单位活期存款结构分析", "保证金存款监测", "代发业务拓展监测", "对公信贷投放监控", "项目融资投放分析", "小微企业贷款分析", "供应链金融业务看板", "信用证业务分析", "保函业务风险监测", "票据贴现经营分析", "现金管理客户活跃分析", "交易银行客户活跃分析", "公司客户流失预警", ], "folder": "公司金融数据门户", }, { "domain": "信用卡", "level1": "信用卡中心", "level2": "经营管理部", "level3": "数据分析团队", "topics": [ "信用卡新增客户分析", "信用卡交易额监测", "信用卡分期业务看板", "信用卡账单分期转化分析", "信用卡逾期风险预警", "信用卡活跃客户经营", "信用卡渠道获客分析", "信用卡额度使用分析", "信用卡客群画像看板", "信用卡权益活动效果分析", "信用卡商户交易监测", "信用卡睡眠客户唤醒看板", ], "folder": "信用卡经营分析", }, { "domain": "风险", "level1": "风险管理部", "level2": "组合风险管理室", "level3": "模型监测团队", "topics": [ "信贷资产质量监测", "不良贷款迁徙分析", "逾期客户风险预警", "关注类贷款压降监控", "拨备覆盖率经营看板", "重点行业风险排查", "授信集中度监控", "贷后检查进度跟踪", "资产分类变动分析", "抵质押品价值重估分析", "风险预警信号处置看板", "大额风险暴露监测", ], "folder": "风险管理驾驶舱", }, { "domain": "运营", "level1": "运营管理部", "level2": "运营监控室", "level3": "流程管理团队", "topics": [ "网点运营效率看板", "柜面业务量监测", "远程银行服务分析", "账户开立质量监测", "企业账户年检进度监控", "支付结算业务分析", "反洗钱可疑交易跟踪", "客户投诉处理看板", "运营风险事件监控", "集中作业处理时效分析", "现金库存与调拨监测", "电子回单服务分析", ], "folder": "运营管理专区", }, { "domain": "财务", "level1": "计划财务部", "level2": "管理会计室", "level3": "财务分析团队", "topics": [ "分行利润贡献分析", "FTP收支测算看板", "费用预算执行监控", "中间业务收入分析", "净利息收入贡献分析", "资本占用收益分析", "经营计划完成率看板", "资产负债结构分析", "税务成本监测看板", "经济利润EVA分析", "分产品收益率分析", "管理会计分摊结果看板", ], "folder": "财务管理驾驶舱", }, { "domain": "渠道", "level1": "网络经营服务部", "level2": "渠道管理室", "level3": "数字渠道团队", "topics": [ "手机银行活跃分析", "网银交易监测", "网点客流热力分析", "自助设备运行看板", "远程渠道转化分析", "企业网银客户经营", "开放银行接口监测", "渠道协同营销看板", "数字人民币交易分析", "小程序渠道转化分析", "线上预约到店分析", "渠道客户体验监测", ], "folder": "渠道经营分析", }, { "domain": "普惠", "level1": "普惠金融部", "level2": "普惠经营管理室", "level3": "小微客户团队", "topics": [ "普惠贷款投放监控", "小微客户增长看板", "普惠首贷户拓展分析", "涉农贷款经营监测", "科技型企业贷款分析", "普惠风险补偿跟踪", "普惠延期还本付息监测", "科创小微客户经营分析", "个体工商户贷款看板", "普惠贷款利率定价分析", ], "folder": "普惠金融专区", }, { "domain": "金融市场", "level1": "金融市场部", "level2": "投资交易管理室", "level3": "市场风险与经营分析团队", "topics": [ "债券投资组合分析", "同业负债成本监测", "资金头寸预测看板", "外汇交易损益分析", "衍生品估值监测", "票据转贴现业务分析", "理财投资资产穿透看板", "金融市场限额占用监控", "市场价格波动预警", ], "folder": "金融市场经营专区", }, { "domain": "国际业务", "level1": "国际业务部", "level2": "跨境金融管理室", "level3": "国际结算分析团队", "topics": [ "跨境结算业务分析", "贸易融资投放监控", "进口信用证业务看板", "出口托收业务跟踪", "外汇存款结构分析", "结售汇客户贡献分析", "跨境人民币业务监测", "国际业务风险预警", ], "folder": "国际业务分析专区", }, { "domain": "合规", "level1": "法律合规部", "level2": "反洗钱与制裁合规管理团队", "level3": "合规数据应用室", "topics": [ "受益所有人信息缺失监测", "客户尽职调查进度看板", "反洗钱名单命中分析", "可疑交易报告质效分析", "员工异常行为排查", "监管报送质量监控", "合规检查问题整改跟踪", "制裁筛查处理时效分析", ], "folder": "合规管理专区", }, { "domain": "人力", "level1": "人力资源部", "level2": "绩效薪酬管理室", "level3": "人力数据分析团队", "topics": [ "机构人均产能分析", "客户经理绩效看板", "支行奖金分配测算", "岗位编制与人员缺口分析", "员工培训完成率监测", "人才盘点与梯队建设看板", "一线人员工作量分析", ], "folder": "人力资源分析专区", }, ] VERSION_SUFFIXES = [ "", " V1.0", " V2.0", " V3.0", " V4.0", "(机构版)", "(管理端)", "(支行版)", "(客户经理版)", "(测试版)", "月报", "日报", "周报", "快报", "大屏", ] def make_id(prefix: str, ordinal: int) -> str: raw = f"{prefix}-{ordinal}-{RANDOM_SEED}".encode("utf-8") return hashlib.md5(raw).hexdigest()[:24] def parse_branch_names(raw_branch_names: list[str] | None) -> list[str]: if not raw_branch_names: return DEFAULT_BRANCH_NAMES branch_names: list[str] = [] for raw_value in raw_branch_names: for branch_name in raw_value.split(","): branch_name = branch_name.strip() if branch_name and branch_name not in branch_names: branch_names.append(branch_name) unknown_branch_names = [name for name in branch_names if name not in BRANCH_DM_MAP] if unknown_branch_names: supported = "、".join(BRANCH_DM_MAP) unknown = "、".join(unknown_branch_names) raise ValueError(f"unsupported branch name: {unknown}. Supported branch names: {supported}") return branch_names def build_branch_pool(branch_names: list[str]) -> list[tuple[str, str, str]]: return [(BRANCH_ID_MAP[name], name, BRANCH_DM_MAP[name]) for name in branch_names] def choose_branch( rng: random.Random, domain: str, branch_pool: list[tuple[str, str, str]], ) -> tuple[str, str, str]: total_branch = next((branch for branch in branch_pool if branch[1] == "总行"), None) if total_branch and domain in {"风险", "财务", "信用卡"} and rng.random() < 0.36: return total_branch return rng.choice(branch_pool) def make_timestamp(rng: random.Random, start: datetime, end: datetime) -> str: seconds = int((end - start).total_seconds()) value = start + timedelta(seconds=rng.randint(0, seconds)) microsecond = rng.choice([0, 152000, 281000, 371000, 495000, 659000, 749000, 911000]) return value.replace(microsecond=microsecond).strftime("%Y-%m-%d %H:%M:%S.%f") def build_folder_route(rng: random.Random, branch_name: str, config: dict[str, object], topic: str) -> str: roots = [ f"根目录/{config['folder']}/{config['domain']}/{topic}", f"根目录/业务网仪表板/{config['domain']}条线/{branch_name}/{topic}", f"根目录/总行/经营管理驾驶舱/{config['folder']}/{topic}", f"根目录/用户开发报表区/{branch_name}/{config['folder']}/{topic}", f"根目录/应用市场下载目录/分析模板/{config['domain']}经营/{topic}", ] return rng.choice(roots) def make_dashboard_name( rng: random.Random, ordinal: int, branch_name: str, topic: str, used_names: set[str], ) -> str: suffix = rng.choice(VERSION_SUFFIXES) branch_prefix = branch_name if rng.random() < 0.34 and branch_name != "总行" else "" base_name = f"{branch_prefix}{topic}{suffix}" dash_name = base_name if dash_name in used_names: dash_name = f"{base_name}(第{ordinal:03d}期)" retry_index = 2 while dash_name in used_names: dash_name = f"{base_name}(第{ordinal:03d}-{retry_index}期)" retry_index += 1 used_names.add(dash_name) return dash_name def build_record( rng: random.Random, ordinal: int, config: dict[str, object], topic: str, branch_pool: list[tuple[str, str, str]], used_names: set[str], ) -> dict[str, object]: bbk_id, bbk_name, dm_nm = choose_branch(rng, str(config["domain"]), branch_pool) dash_name = make_dashboard_name(rng, ordinal, bbk_name, topic, used_names) level1 = str(config["level1"]) level2 = str(config["level2"]) level3 = str(config["level3"]) level4_candidates = ["数据应用室", "经营推动组", "业务支持组", "指标管理组", ""] level4 = rng.choice(level4_candidates) dept_fname_parts = ["XX银行", bbk_name, level1, level2, level3] if level4: dept_fname_parts.append(level4) chart_cnt = rng.randint(20, 50) author_cnt = rng.randint(8, 360) first_visit = make_timestamp(rng, datetime(2023, 1, 1), datetime(2026, 2, 28)) recent_start = datetime.strptime(first_visit, "%Y-%m-%d %H:%M:%S.%f") + timedelta(days=1) recent_end = datetime(2026, 5, 5, 23, 59, 59) recent_visit = make_timestamp(rng, recent_start, recent_end) if recent_start < recent_end else first_visit efficiency = rng.choices(["高效", "中效", "低效"], weights=[0.34, 0.46, 0.20], k=1)[0] visit_ranges = { "高效": (360, 6800), "中效": (80, 1200), "低效": (0, 180), } visit_low, visit_high = visit_ranges[efficiency] return { "dash_id": make_id("dash", ordinal), "dash_dsply_name": dash_name, "folder_all_route": build_folder_route(rng, bbk_name, config, topic), "build_main": rng.choices(["总行建设", "分行自建"], weights=[0.48, 0.52], k=1)[0], "dept_cls": rng.choices(["业务部门", "信息技术部", "风险合规部门", "管理部门"], weights=[0.66, 0.18, 0.09, 0.07], k=1)[0], "dash_rat": efficiency, "bbk_name": bbk_name, "bbk_id": bbk_id, "dept_fname": "/".join(dept_fname_parts), "level1_dept_name": level1, "level2_dept_name": level2, "level3_dept_name": level3, "level4_dept_name": level4, "chart_cnt": chart_cnt, "be_author_cnt": author_cnt, "fir_be_visit_tm": first_visit, "recnt_be_visit_tm": recent_visit, "m3_vis_cnt": rng.randint(visit_low, visit_high), "mon_add_idf": rng.choices(["是", "否"], weights=[0.08, 0.92], k=1)[0], "dm_nm": dm_nm, } def make_dashboards( target_count: int = 200, start_ordinal: int = 1, branch_pool: list[tuple[str, str, str]] | None = None, used_names: set[str] | None = None, ) -> pd.DataFrame: rng = random.Random(RANDOM_SEED + start_ordinal - 1) branch_pool = branch_pool or build_branch_pool(DEFAULT_BRANCH_NAMES) used_names = used_names if used_names is not None else set() records: list[dict[str, object]] = [] topic_pairs: list[tuple[dict[str, object], str]] = [] for config in DOMAIN_CONFIGS: for topic in config["topics"]: topic_pairs.append((config, topic)) while len(records) < target_count: rng.shuffle(topic_pairs) for config, topic in topic_pairs: records.append( build_record( rng, start_ordinal + len(records), config, topic, branch_pool, used_names, ) ) if len(records) >= target_count: break columns = [ "dash_id", "dash_dsply_name", "folder_all_route", "build_main", "dept_cls", "dash_rat", "bbk_name", "bbk_id", "dept_fname", "level1_dept_name", "level2_dept_name", "level3_dept_name", "level4_dept_name", "chart_cnt", "be_author_cnt", "fir_be_visit_tm", "recnt_be_visit_tm", "m3_vis_cnt", "mon_add_idf", "dm_nm", ] return pd.DataFrame(records, columns=columns) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Generate mock BI dashboard parquet data.") parser.add_argument( "--mode", choices=["overwrite", "append"], default="overwrite", help="overwrite replaces the parquet file; append inserts generated rows into the existing parquet file.", ) parser.add_argument( "--count", type=int, default=200, help="number of dashboard rows to generate in this run.", ) parser.add_argument( "--output", type=Path, default=OUTPUT_PATH, help="target parquet path.", ) parser.add_argument( "--branch-name", action="append", help=( "branch names to generate, separated by comma or passed repeatedly. " "Defaults to 总行,北京分行,广东分行." ), ) return parser.parse_args() def main() -> None: args = parse_args() if args.count <= 0: raise ValueError("--count must be a positive integer") output_path = args.output.resolve() output_path.parent.mkdir(parents=True, exist_ok=True) branch_names = parse_branch_names(args.branch_name) branch_pool = build_branch_pool(branch_names) existing = pd.DataFrame() if args.mode == "append" and output_path.exists(): existing = pd.read_parquet(output_path) used_names = set(existing["dash_dsply_name"]) if not existing.empty else set() new_dashboards = make_dashboards( args.count, start_ordinal=len(existing) + 1, branch_pool=branch_pool, used_names=used_names, ) dashboards = pd.concat([existing, new_dashboards], ignore_index=True) if not existing.empty else new_dashboards dashboards.to_parquet(output_path, index=False) duplicate_count = int(dashboards["dash_id"].duplicated().sum()) duplicate_name_count = int(dashboards["dash_dsply_name"].duplicated().sum()) print(f"mode: {args.mode}") print(f"branch names: {', '.join(branch_names)}") print(f"generated rows: {len(new_dashboards)}") print(f"wrote total rows: {len(dashboards)} to {output_path}") print(f"duplicate dash_id: {duplicate_count}") print(f"duplicate dash_dsply_name: {duplicate_name_count}") print(f"columns: {', '.join(dashboards.columns)}") print("dash_rat:", dashboards["dash_rat"].value_counts().to_dict()) print("chart_cnt:", int(dashboards["chart_cnt"].min()), int(dashboards["chart_cnt"].max())) print("level1_dept_name:", dashboards["level1_dept_name"].value_counts().to_dict()) if __name__ == "__main__": main()