import ast import json import re import sys import traceback import pandas as pd # 聚合函数 AGGREGATION_MAP = { 'SUM': 'SUM', 'AVG': 'AVG', 'CNT': 'COUNT', 'MAX': 'MAX', 'MIN': 'MIN', 'CNT_DISTINCT': 'COUNT(DISTINCT {})' } AGGREGATION_SUFFIX_MAP = { 'SUM': '求和', 'AVG': '均值', 'CNT': '计数', 'MAX': '最大值', 'MIN': '最小值', 'CNT_DISTINCT': '去重计数' } # 筛选操作符 FILTER_OPERATOR_MAP = { 'BT': {"val_nums": 2, 'template': "{field} BETWEEN {value_1} AND {value_2}"}, 'CLOSE_BT_OPEN': {"val_nums": 2, 'template': "{field} >= {value_1} AND {field} < {value_2}"}, 'EQ': {"val_nums": 1, 'template': "{field} = {value}"}, 'GE': {"val_nums": 1, 'template': "{field} >= {value}"}, 'GT': {"val_nums": 1, 'template': "{field} > {value}"}, 'IN': {"val_nums": 9, 'template': "{field} IN ({values})"}, 'IS_NULL': {"val_nums": 0, 'template': "{field} IS NULL"}, 'LE': {"val_nums": 1, 'template': "{field} <= {value}"}, 'LT': {"val_nums": 1, 'template': "{field} < {value}"}, 'NE': {"val_nums": 1, 'template': "{field} != {value}"}, 'NI': {"val_nums": 9, 'template': "{field} NOT IN ({values})"}, 'NOT_NULL': {"val_nums": 0, 'template': "{field} IS NOT NULL"}, 'OPEN_BT_CLOSE': {"val_nums": 2, 'template': "{field} > {value_1} AND {field} <= {value_2}"}, 'OPEN_BT_OPEN': {"val_nums": 2, 'template': "{field} > {value_1} AND {field} < {value_2}"}, 'CONTAINS': {"val_nums": 1, 'template': "{field} LIKE '%{value}%'"}, 'NOT_CONTAINS': {"val_nums": 1, 'template': "{field} NOT LIKE '%{value}%'"}, 'STARTS_WITH': {"val_nums": 1, 'template': "{field} LIKE '{value}%'"}, 'NOT_STARTS_WITH': {"val_nums": 1, 'template': "{field} NOT LIKE '{value}%'"}, 'ENDS_WITH': {"val_nums": 1, 'template': "{field} LIKE '%{value}'"}, 'NOT_ENDS_WITH': {"val_nums": 1, 'template': "{field} NOT LIKE '%{value}'"}, 'CUSTOM': 'CUSTOM', 'SPARK_EXPR': 'SPARK_EXPR' } # 引用 IDENTIFIER_QUOTE = '`' QUOTE_FLAG = True WINDOW_MAX_OVER_PATTERN = re.compile( r"max\s*\(\s*(?P.*?)\s*\)\s*over\s*\(\s*(?P.*?)\s*\)", flags=re.IGNORECASE | re.DOTALL, ) AGGREGATION_PATTERN= re.compile(r"\b(sum|avg|count|max|min|stddev|variance|collect_list|collect_set|percentile|percentile_approx)|\s*\(", flags=re.IGNORECASE) # 副词 ADV_FILTER_EXP_MAP = { 'TODAY': "{field} = '{{today}}'", 'YESTERDAY': "{field} = date_sub('{{today}}', 1)", 'DAY_BEFORE_YESTERDAY': "{field} = date_sub('{{today}}', 2)", 'LAST_14_DAY': "{field} between date_sub('{{today}}', 13) and '{{today}}'", 'LAST_1_YEAR': "{field} between date_sub('{{today}}', 364) and '{{today}}'", 'LAST_30_DAY': "{field} between date_sub('{{today}}', 29) and '{{today}}'", 'LAST_7_DAY': "{field} between date_sub('{{today}}', 6) and '{{today}}'", 'LAST_90_DAY': "{field} between date_sub('{{today}}', 89) and '{{today}}'", 'LAST_MONTH': "{field} between datesub(add_months('{{today}}', -1), day('{{today}}') - 1) and date_sub('{{today}}', day('{{today}}'))", 'LAST_WEEK': "{field} between date_sub('{{today}}', case when dayofweek('{{today}}') = 1 then 13 else dayofweek('{{today}}')+5 end) and date_sub('{{today}}', case when dayofweek('{{today}}') = 1 then 7 else dayofweek('{{today}}')-1 end)", 'MONTH_BEFORE_LAST_MONTH': "{field} between date_sub(add_months('{{today}}', -2), day(add_months('{{today}}', -2))-1) and date_sub(add_months('{{today}}', -1), day(add_months('{{today}}', -1)))", 'MONTH_TO_DAY': "{field} between date_sub('{{today}}', day('{{today}}')-1) and '{{today}}'", 'MONTH_TO_YESTERDAY': "{field} between date_sub('{{today}}', day('{{today}}')-1) and date_sub('{{today}}', 1)", 'QUARTER_TO_DAY': """{field} between concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01' when quarter('{{today}}') = 2 then '-04-01' when quarter('{{today}}') = 3 then '-07-01' when quarter('{{today}}') = 4 then '-10-01' end) and '{{today}}'""", 'QUARTER_TO_YESTERDAY': """{field} between concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01' when quarter('{{today}}') = 2 then '-04-01' when quarter('{{today}}') = 3 then '-07-01' when quarter('{{today}}') = 4 then '-10-01' end) and date_sub('{{today}}', 1)""", 'WEEK_TO_DAY': "{field} between date_sub('{{today}}', (dayofweek('{{today}}')+5)%7) and '{{today}}'", 'WEEK_TO_YESTERDAY': "{field} between date_sub('{{today}}', (dayofweek('{{today}}')+5)%7) and date_sub('{{today}}', 1)", 'YEAR_TO_DAY': "{field} between concat(year('{{today}}'), '-01-01') and '{{today}}'", 'YEAR_TO_LAST_MONTH': "{field} between concat(year('{{today}}'), '-01-01') and date_sub('{{today}}', day('{{today}}'))", 'YEAR_TO_LAST_QUARTER': """{field} between concat(year('{{today}}'), '-01-01') and date_sub(concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01' when quarter('{{today}}') = 2 then '-04-01' when quarter('{{today}}') = 3 then '-07-01' when quarter('{{today}}') = 4 then '-10-01' end), 1)""", 'YEAR_TO_YESTERDAY': "{field} between concat(year('{{today}}'), '-01-01') and date_sub('{{today}}', 1)", } # 自带日期转换 PARTIAL_DATE_EXPRESSION = { 'day': "`{old}` as `{new}`", 'month': "concat(year(`{old}`), '-', lpad(month(`{old}`), 2, '0') as `{new}`", 'year': "year(`{old}`) as `{new}`", 'quarter': "concat(year(`{old}`), '-S', quarter(`{old}`)) as `{new}`", 'week': "weekofyear(`{old}`) as `{new}`", 'dayofweek': "dayofweek(`{old}`)-1 as `{new}`", 'hour': "hour(`{old}`)-8 as `{new}`", 'minute': "minute(`{old}`)-8 as `{new}`", } # 获取新增字段 def get_added_fields_info(added_fields_df): if added_fields_df.empty: return {} added_fields_info = {} for _, row in added_fields_df.iterrows(): try: content = json.loads(row['calc_field_logic']) except: print(f'ERROR: 新增字段解析错误: {row["calc_field_logic"]}') continue field_name = content['name'] field_id = content['fdId'] added_fields_info[field_id] = {"field_name": field_name, 'calculation': content} added_fields_info[field_name] = {"field_id": field_id, 'calculation': content} return added_fields_info def get_fid_name_map(field_def_df): field_id_name = {} for i, row in field_def_df.iterrows(): field_id_name[row['field_id']] = row['field_name'] return field_id_name # 字段映射关系 def get_fields_rename_map(field_info): ret = {} try: tmp_map = json.loads(field_info) except: return ret dimensions, metrics = tmp_map.get("dimensions"), tmp_map.get("metrics") if dimensions and dimensions != 'null': for one_map in dimensions: ret[one_map["name"]] = one_map["alias"] if metrics and metrics != 'null': for one_map in metrics: ret[one_map["name"]] = one_map["alias"] return ret # 递归解析嵌套的计算字段 def resolve_calculation_formula(formula, calculation_fields, visited=None): if not formula: return formula if visited is None: visited = set() def replace_calculation_field(match): field_key = match.group(1).strip() field_def = calculation_fields.get(field_key) if not field_def: return match.group(0) field_id = field_def.get("field_id") or field_key if field_id in visited: raise ValueError(f"计算字段存在循环引用: {field_key}") nested_formula = field_def["calculation"].get("formula", "") if "consolidation" in nested_formula: return match.group(0) resolved = resolve_calculation_formula(nested_formula, calculation_fields, visited | {field_id}) return f"({resolved})" return re.sub(r"\[([^\[\]]+)\]", replace_calculation_field, formula) def extract_formula_field_refs(formula): # 提取公式中以 [字段] 形式引用的字段,供依赖收集使用。 if not formula: return set() refs = set() for match in re.findall(r"\[([^\[\]]+)\]", formula): field_name = match.strip() if field_name and not re.fullmatch(r"\d+", field_name): refs.add(field_name) return refs def collect_formula_dependencies(formula, calculation_fields, visited=None): # 递归下钻计算字段,收集最终依赖到的数据集原始字段。 if not formula: return set() if visited is None: visited = set() if "consolidation" in formula: consolidation_dict = json.loads(formula)["consolidation"] source_name = consolidation_dict.get("sourceName") if not source_name: return set() source_field = calculation_fields.get(source_name) if not source_field: return {source_name} source_field_id = source_field.get("field_id") or source_name if source_field_id in visited: raise ValueError(f"计算字段存在循环引用: {source_name}") nested_formula = source_field["calculation"].get("formula", "") return collect_formula_dependencies(nested_formula, calculation_fields, visited | {source_field_id}) dependencies = set() for field_name in extract_formula_field_refs(formula): field_def = calculation_fields.get(field_name) if not field_def: dependencies.add(field_name) continue field_id = field_def.get("field_id") or field_name if field_id in visited: raise ValueError(f"计算字段存在循环引用: {field_name}") nested_formula = field_def["calculation"].get("formula", "") dependencies.update(collect_formula_dependencies(nested_formula, calculation_fields, visited | {field_id})) return dependencies def collect_filter_dependencies(filter_relation_str, calculation_fields): # 筛选条件里的公式也可能依赖额外字段,需要提前纳入 WITH 基础列。 if not filter_relation_str or filter_relation_str == "[]": return set() dependencies = set() try: raw_conditions = json.loads(filter_relation_str) except Exception: return dependencies for cond_dict in raw_conditions: field_name = cond_dict.get("name") if field_name: dependencies.add(field_name) formula = cond_dict.get("formula") if formula: dependencies.update(collect_formula_dependencies(formula, calculation_fields)) consolidation = cond_dict.get("consolidation") if consolidation: source_name = consolidation.get("sourceName") if source_name: source_field = calculation_fields.get(source_name) if not source_field: dependencies.add(source_name) else: nested_formula = source_field["calculation"].get("formula", "") dependencies.update(collect_formula_dependencies(nested_formula, calculation_fields, {source_field.get("field_id") or source_name})) return dependencies def collect_with_base_fields( all_field_names, measure_fields, new_date_fields, new_dimension_fields, dataset_fid_name_map, added_fields_info, filter_relation_str, ): # WITH 只保留后续 SELECT / WHERE / ORDER BY 真正需要的底层字段, # 避免把整张数据集无差别 SELECT 进临时表。 dataset_field_names = set(dataset_fid_name_map.values()) required_fields = {name for name in all_field_names if name in dataset_field_names} for fid, _ in new_date_fields: old_fid = fid.split('_')[0] if old_fid in dataset_fid_name_map: required_fields.add(dataset_fid_name_map[old_fid]) elif old_fid in added_fields_info: formula = added_fields_info[old_fid]["calculation"].get("formula", "") required_fields.update(collect_formula_dependencies(formula, added_fields_info, {old_fid})) for fid, _ in new_dimension_fields: formula = added_fields_info[fid]["calculation"].get("formula", "") required_fields.update(collect_formula_dependencies(formula, added_fields_info, {fid})) for field in measure_fields: if field not in added_fields_info: continue field_id = added_fields_info[field]["field_id"] formula = added_fields_info[field]["calculation"].get("formula", "") required_fields.update(collect_formula_dependencies(formula, added_fields_info, {field_id})) required_fields.update(collect_filter_dependencies(filter_relation_str, added_fields_info)) return required_fields def resolve_window_expression_fields(expression, calculation_fields): # 窗口函数内部若引用了计算字段,需要先还原为公式, # 否则 WITH 中生成的窗口列仍会依赖一个并不存在的别名字段。 if not expression: return expression def replace_identifier(match): field_name = match.group(1).strip() field_def = calculation_fields.get(field_name) if not field_def: return match.group(0) field_id = field_def.get("field_id") or field_name formula = field_def["calculation"].get("formula", "") if "consolidation" in formula: resolved_formula = get_consolidation_field(json.loads(formula)["consolidation"]) else: resolved_formula = resolve_calculation_formula(formula, calculation_fields, {field_id}) resolved_formula = quote_identifier(resolved_formula, formula=True) return f"({resolved_formula})" return re.sub(r"`([^`]+)`", replace_identifier, expression) def rewrite_window_max_over(expression, calculation_fields, window_alias_map, window_select_expressions): # Hive / SparkSQL 不允许在 WHERE/HAVING 中直接使用窗口函数。 # 这里将 max() over(...) 提取到 WITH 中,WHERE 里只保留对中间列的判断。 if not expression: return expression def replace_window(match): raw_expression = resolve_window_expression_fields(match.group(0).strip(), calculation_fields) normalized_expression = re.sub(r"\s+", " ", raw_expression).lower() alias = window_alias_map.get(normalized_expression) if not alias: alias = f"window_max_over_{len(window_alias_map) + 1}" window_alias_map[normalized_expression] = alias window_select_expressions.append(f"{raw_expression} AS {quote_identifier(alias)}") return quote_identifier(alias) return WINDOW_MAX_OVER_PATTERN.sub(replace_window, expression) def build_with_part( new_date_fields, new_dimension_fields, dataset_fid_name_map, added_fields_info, dataset_id, required_base_fields, extra_with_expressions=None, ): override_field_names = set() for _, new_name in new_date_fields: override_field_names.add(new_name) for fid, _ in new_dimension_fields: override_field_names.add(added_fields_info[fid]["field_name"]) base_columns = [] seen_columns = set() for field_name in dataset_fid_name_map.values(): if field_name in override_field_names or field_name in seen_columns: continue # 仅保留依赖收集阶段判定为需要的原始字段。 if field_name not in required_base_fields: continue seen_columns.add(field_name) base_columns.append(quote_identifier(field_name)) with_expressions = [] for fid, new_name in new_date_fields: old_fid, partial_date = fid.split('_') if old_fid in dataset_fid_name_map: old_name = dataset_fid_name_map[old_fid] elif old_fid in added_fields_info: old_name = added_fields_info[old_fid]["calculation"]["formula"].replace("[", "").replace("]", '') else: raise ValueError(f"字段 {fid} {new_name} 不存在") tmp_part = PARTIAL_DATE_EXPRESSION.get(partial_date, None) if tmp_part: tmp_part = tmp_part.format(old=old_name, new=new_name) with_expressions.append(tmp_part) else: raise ValueError(f"日期转换方式 {partial_date} 不存在") for fid, new_name in new_dimension_fields: field_def = added_fields_info[fid] new_name = field_def["field_name"] formula = field_def["calculation"]["formula"] if "consolidation" in formula: consolidation_dict = json.loads(formula)["consolidation"] tmp_part = get_consolidation_field(consolidation_dict) tmp_part += f" AS `{new_name}`" else: # 递归解析计算字段是否有嵌套情况 formula = resolve_calculation_formula(formula, added_fields_info, {fid}) tmp_part = quote_identifier(formula, formula=True) + f" AS `{new_name}`" with_expressions.append(tmp_part) if extra_with_expressions: # 额外字段主要承载从 WHERE 中抽出的窗口函数中间列。 with_expressions.extend(extra_with_expressions) select_parts = base_columns + with_expressions sql_part = "WITH tmp as (\nSELECT " + ",\n".join(select_parts) sql_part += f"\nFROM {quote_identifier(str(dataset_id))}\n)" return sql_part # 处理计算字段 def process_measure_fields(measure_fields, measure_aggs, calculation_fields, card_id, card_name): ## 数值字段数量 小于 聚合函数数量,不合法 if len(measure_fields) < len(measure_aggs): print(f"警告: 卡片 {card_id} {card_name}: 数值字段数量小于聚合函数数量,不合法") print(f"警告: 卡片 {card_id} {card_name}: 不添加任何数值字段.") return [], [], [], False ## 数值字段 大于 聚合函数数量,存在聚合类型的计算字段,尝试填充 elif len(measure_fields) > len(measure_aggs): ## 计算数值字段数量 num_cals = 0 for field in measure_fields: if field in calculation_fields: # and calculation_fields[field]["calculation"]["isAggregated"] is True: num_cals += 1 ## 如果不存在任何计算字段,补全剩余的NUL聚合函数 if num_cals == 0: measure_aggs.extend(['NULL'] * (len(measure_fields) - len(measure_aggs))) return [quote_identifier(field) for field in measure_fields], measure_aggs, [False] * len(measure_fields), True ## 如果存在计算字段,且相加后的 聚合函数数量 仍小于 数值字段数量,不合法 if num_cals + len(measure_aggs) != len(measure_fields): print(f"警告: 卡片 {card_id} {card_name}: 数值字段数量大于聚合函数数量,不合法") print(f"警告: 卡片 {card_id} {card_name}: 不添加任何数值字段.") return [], [], [], False ## 通过验证,填充聚合函数 new_measure_fields, new_measure_aggs, measure_is_aggregated, agg_flag = [], [], [], False for i, field in enumerate(measure_fields): ## 非计算字段 if field not in calculation_fields: new_measure_fields.append(quote_identifier(field)) new_measure_aggs.append(measure_aggs.pop(0)) measure_is_aggregated.append(False) ## 计算字段 else: formula = calculation_fields[field]["calculation"]["formula"] formula = resolve_calculation_formula(formula, calculation_fields, {calculation_fields[field]["field_id"]}) new_measure_fields.append(quote_identifier(formula, formula=True)) if calculation_fields[field]["calculation"]["isAggregated"] is True: new_measure_aggs.append("NUL") measure_is_aggregated.append(True) agg_flag = True else: new_measure_aggs.append('NUL') measure_is_aggregated.append(True) return new_measure_fields, new_measure_aggs, measure_is_aggregated, agg_flag # sql部分去重 def dedupe_sql_parts(parts): deduped = [] seen = set() for part in parts: if not part or part in seen: continue seen.add(part) deduped.append(part) return deduped def quote_identifier(identifier, formula=False): if not QUOTE_FLAG: return identifier if not identifier: return '' # 简单处理,如果包含非字母数字下划线或可能是关键字,则加反引号 # 更复杂的关键字检查可以添加 if formula: params = re.findall(r"\[DYNAMIC_PARAMS\.\w+\]", identifier) for p in params: subs = p[1:-1] subs = "{{{"+subs+"}}}" identifier = identifier.replace(p, subs, 1) # 仅替换配置里用于包裹字段名的 [字段],保留 Hive 下标访问里的 [2] 等表达式 def replace_bracket_identifier(match): content = match.group(1) if re.fullmatch(r"\d+", content.strip()): return match.group(0) return f"{IDENTIFIER_QUOTE}{content}{IDENTIFIER_QUOTE}" identifier = re.sub(r"\[([^\[\]]+)\]", replace_bracket_identifier, identifier) else: identifier = identifier.replace('\n', ' ') if not re.match(r'[a-zA-Z_][a-zA-Z0-9_]*$', identifier): return f'{IDENTIFIER_QUOTE}{identifier}{IDENTIFIER_QUOTE}' return identifier def parse_multi_value_field(field_value): # 解析包含多个值的字段 if not field_value or field_value == "": return [] try: res = ast.literal_eval(field_value) except Exception: print(field_value) print(traceback.format_exc()) return ast.literal_eval(field_value) # 处理过滤条件的操作符 def get_format_args(field, fd_type, op_dict, values): # 按照数据类型及操作符,判断是否需要加引号 if fd_type in ('DECIMAL', 'DOUBLE', 'INT', 'FLOAT', 'LONG', 'SHORT'): values = [x for x in values if x] elif fd_type in ('DATE', 'STRING', 'SUB_DATE', 'TIMESTAMP'): if op_dict.get('quote', True): values = [f"'{x}'" for x in values] elif fd_type == 'BOOL': values = [value.upper() for value in values] else: pass # 按照操作符所需参数个数构造format参数 format_dict = {} value_nums = op_dict['val_nums'] if value_nums == 9: format_dict.update(**{"values": ", ".join(values)}) elif value_nums == 2: format_dict.update(**{"value_1": values[0], "value_2": values[1]}) elif value_nums == 1: format_dict.update(**{"value": values[0]}) else: pass format_dict["field"] = field return format_dict # 处理过滤条件中的consolidation def get_consolidation_field(consolidation_dict): field_name = quote_identifier(consolidation_dict["sourceName"]) group_type = consolidation_dict["groupType"] fd_type = consolidation_dict["sourceFdType"] group_rules = consolidation_dict.get('groups') fixed_step = consolidation_dict.get('fixedStepSetting') when_part = [] else_part = None if group_type == 'ITEM': for group in group_rules: group_name = group["groupName"] if group.get('isOtherGroup', False): else_part = f"ELSE '{group_name}'" else: selected_values = group.get('selectedValues', []) when_value = str(selected_values) when_value = when_value[1:-1] # 去除中括号 if when_value is None: when_part.append(f"WHEN {field_name} IS NULL THEN '{group_name}'") else: when_part.append(f"WHEN {field_name} IN ({when_value}) THEN '{group_name}'") elif group_type == "CONDITION": for group in group_rules: group_name = group["groupName"] if group.get('isOtherGroup', False): else_part = f"ELSE '{group_name}'" else: rules = group["rules"] cond_list = [] combine_type = " " + rules["combineType"] + " " # 加空格以便join for cond in rules["conditions"]: filter_type = cond["filterType"] filter_value = cond["filterValue"] op_dict = FILTER_OPERATOR_MAP[filter_type] format_args = get_format_args(field_name, fd_type, op_dict, [filter_value]) cond_str = op_dict["template"].format(**format_args) cond_list.append(cond_str) when_str = "WHEN " + combine_type.join(cond_list) + f" THEN '{group_name}'" when_part.append(when_str) elif group_type == "CUSTOM_STEP": for group in group_rules: group_name = group["groupName"] if group.get('isOtherGroup', False): else_part = f"ELSE '{group_name}'" else: setting = group['customStepSetting'] operator = setting['operator'] start = setting['startValue'] end = setting['endValue'] condition = '' if operator == 'BT': condition = f"{field_name} BETWEEN {start} AND {end}" elif operator == 'OPEN_BT_CLOSE': condition = f"{field_name} > {start} AND {field_name} <= {end}" elif operator == 'OPEN_BT_OPEN': condition = f"{field_name} > {start} AND {field_name} < {end}" elif operator == 'CLOSE_BT_OPEN': condition = f"{field_name} >= {start} AND {field_name} < {end}" else: raise ValueError(f"未知的操作符: {operator}") when_part.append(f"WHEN {condition} THEN '{group_name}'") elif group_type == "FIXED_STEP": start = fixed_step['startValue'] end = fixed_step['endValue'] step = fixed_step['stepSize'] # 生成每个区间的case when部分 lower = start while lower < end: upper = lower + step case_part = f"WHEN {field_name} >= {lower} AND {field_name} < {upper} THEN '{lower}-{upper}'" when_part.append(case_part) lower = upper # 处理最后一个区间 case_part = f"WHEN {field_name} >= {lower} AND {field_name} <= {end} THEN '{lower}-{end}'" when_part.append(case_part) else_part = "ELSE NULL" else: raise ValueError(f"未知的groupType: {group_type}") field = "CASE " field += "\n".join(when_part) if else_part: field += f"\n{else_part}" field += "\nEND" return field def parse_filter_string(filter_relation_str, calculation_fields=None, window_alias_map=None, window_select_expressions=None): conditions = {} if not filter_relation_str or filter_relation_str == "[]": return conditions if calculation_fields is None: calculation_fields = {} if window_alias_map is None: window_alias_map = {} if window_select_expressions is None: window_select_expressions = [] raw_conditions = json.loads(filter_relation_str) for cond_dict in raw_conditions: fdId = cond_dict.get("fdId") field = cond_dict.get("name") fd_type = cond_dict.get("fdType") op_name = cond_dict.get("filterType") op_dict = FILTER_OPERATOR_MAP.get(op_name) values = cond_dict.get("filterValue") # list is_aggregated = cond_dict.get("isAggregated", False) # 检查条件合法 if any([fdId is None, field is None, fd_type is None, op_name is None, values is None]): print(f"fdId: {fdId} field: {field} fd_type: {fd_type} op_name: {op_name} values: {values}") print(f"警告: 无法解析筛选条件,缺少必须字段,跳过此条件。") continue if op_dict is None: print(f"警告: 无法解析筛选条件,未定义的筛选类型: {op_name},跳过此条件。") continue # 特殊操作符 if op_dict == 'CUSTOM': if "advFilter" not in cond_dict: print(f"警告: CUSTOM筛选类型不存在advFilter, 跳过此条件。") continue if 'formula' in cond_dict: field = quote_identifier(cond_dict['formula'], formula=True) # 先改写窗口函数,避免将非法的 over(...) 留在 WHERE 条件中。 field = rewrite_window_max_over(field, calculation_fields, window_alias_map, window_select_expressions) else: field = quote_identifier(cond_dict['name']) expression = ADV_FILTER_EXP_MAP.get(cond_dict["advFilter"]) if not expression: print(f"警告: CUSTOM筛选类型出现未定义的advFilter: {cond_dict['advFilter']}, 跳过此条件。") continue expression = expression.format(field=field) conditions[fdId] = {"exp": expression, "agg": is_aggregated} continue elif op_dict == 'SPARK_EXPR': if 'formula' in cond_dict: formula = quote_identifier(cond_dict['formula'], formula=True) # SPARK_EXPR 中也可能直接出现窗口函数,处理方式与普通公式一致。 formula = rewrite_window_max_over(formula, calculation_fields, window_alias_map, window_select_expressions) conditions[fdId] = {"exp": formula, "agg": is_aggregated} else: if isinstance(cond_dict['filterValue'], list) and len(cond_dict['filterValue']) == 1: field = quote_identifier(cond_dict['name']) value = cond_dict['filterValue'][0] conditions[fdId] = {"exp": f"{field} = {value}", "agg": is_aggregated} else: print(f"警告: 无法解析筛选条件,SPARK_EXPR中未定义。跳过此条件。") continue # 处理条件 value_nums = op_dict["val_nums"] if value_nums != 9 and len(values) != value_nums: print(f"警告: 无法解析筛选条件,值数量与操作符不匹配。跳过此条件。") continue field = quote_identifier(field) # consolidation 情况,将consolidation公式替换条件左边的field if "consolidation" in cond_dict: consolidation = cond_dict["consolidation"] consolidation_field = get_consolidation_field(consolidation) if not consolidation_field: print(f"警告: 无法解析consolidation字段。跳过此条件。") continue else: field = consolidation_field else: # 公式,非 consolidation情况 if "formula" in cond_dict: field = quote_identifier(cond_dict["formula"], formula=True) field = rewrite_window_max_over(field, calculation_fields, window_alias_map, window_select_expressions) if op_name in ("NI", "IN") and len(values) == 0: print(f"警告: 无法解析筛选条件,IN或NI中参数个数为0。跳过此条件。") continue # 特殊情况 if op_name in ('NI', 'IN') and None in values: conditions[fdId] = {"exp": f"{field} IS NOT NULL", "agg": is_aggregated} values = [x for x in values if x is not None] if len(values) == 0: continue # 填充模板所需要的参数 format_args = get_format_args(field, fd_type, op_dict, values) condition_str = op_dict["template"].format(**format_args) conditions[fdId] = {"exp": condition_str, "agg": is_aggregated} return conditions def build_sql_query(card_data, added_fields_info, dataset_fid_name_map): card_id = card_data["card_id"] card_name = card_data["card_name"] dataset_id = card_data.get("ds_id") if not dataset_id: print(f"错误: {card_id} {card_name} 数据集ID为空.") return "", "", "", "" added_fields_info = get_added_fields_info(added_fields_info) dataset_fid_name_map = get_fid_name_map(dataset_fid_name_map) dimension_fids = parse_multi_value_field(card_data.get("field_id", [])) dimension_fields = parse_multi_value_field(card_data.get("field_name", [])) dimension_fid_name_map = dict(zip(dimension_fids, dimension_fields)) dimension_name_fid_map = dict(zip(dimension_fields, dimension_fids)) measure_fids = parse_multi_value_field(card_data.get("num_value_field_id", [])) measure_fields = parse_multi_value_field(card_data.get("num_value_field_name", [])) # 处理用于转置行列的特殊无ID“度量名”字段 if "度量名" in dimension_fields and len(dimension_fields) == len(dimension_fids) + 1: dimension_fields.remove("度量名") measure_aggs = parse_multi_value_field(card_data.get("num_value_field_merge_way", [])) filter_relation_str = card_data.get("filters_field_value_name_rela") sort_fids = parse_multi_value_field(card_data.get("sort_field_id", [])) sort_fields = parse_multi_value_field(card_data.get("sort_field_name", [])) sort_method = parse_multi_value_field(card_data.get("sort_way", [])) all_field_ids = dimension_fids + \ parse_multi_value_field(card_data.get("filters_field_id", [])) + \ sort_fids + \ measure_fids all_field_names = dimension_fields + \ parse_multi_value_field(card_data.get("filters_field_name", [])) + \ sort_fields + \ measure_fields all_field_id_name_map = dict(zip(all_field_ids, all_field_names)) # 处理字段重命名关系 fields_rename_map = get_fields_rename_map(card_data.get("field_info", "")) # 处理field_id与重命名关系,用于筛选Order by子句中的字段 # 需要处理的只有日期转换类型,将转换前的原始字段名加入map # 只需要更新有重命名的字段即可 selected_fid_alias_map = dict(zip(dimension_fids+measure_fids, dimension_fields+measure_fields)) # 构建SELECT select_parts = [] has_aggregation = False non_aggregated_select_parts = [] # 添加维度字段 for field in dimension_fields: fid = dimension_name_fid_map[field] alias = fields_rename_map.get(field) if alias and alias != "null": select_parts.append(f"{quote_identifier(field)} AS {quote_identifier(alias)}") selected_fid_alias_map[fid] = alias else: select_parts.append(f"{quote_identifier(field)}") selected_fid_alias_map[fid] = field # 加工计算字段 new_measure_fields, measure_aggs, measure_is_aggregated, agg_flag = process_measure_fields(measure_fields, measure_aggs, added_fields_info, card_id, card_name) if agg_flag: has_aggregation = True for i, field in enumerate(new_measure_fields): fid = measure_fids[i] alias = fields_rename_map.get(field.strip('`')) # measure_agg是NUL,不需要聚合(等同于维度字段)或公式本身已经有聚合函数 agg_func_template = AGGREGATION_MAP.get(measure_aggs[i]) if not agg_func_template: if not alias or alias == "null": alias = measure_fields[i] select_parts.append(f"{field} AS {quote_identifier(alias)}") # 属于计算字段,但没有聚合函数,等同于维度字段,需要加入groupbyby。 if not measure_is_aggregated[i] and field and re.search(AGGREGATION_PATTERN, field) is None: if re.match(r"\d+", field): non_aggregated_select_parts.append(quote_identifier(field)) else: non_aggregated_select_parts.append(field) non_aggregated_select_parts.append(field) selected_fid_alias_map[fid] = alias else: has_aggregation = True # 特殊处理 count distinct if '{}' in agg_func_template: agg_expression = agg_func_template.format(field) else: agg_expression = f"{agg_func_template}({field})" # 添加别名 if not alias or alias == "null": suffix = AGGREGATION_SUFFIX_MAP.get(measure_aggs[i]) alias = f"{measure_fields[i]}_{suffix}" select_parts.append(f"{agg_expression} AS {quote_identifier(alias)}") selected_fid_alias_map[fid] = alias # BI 卡片配置里可能存在重复字段。这里只对完全相同的 SELECT 表达式去重,保留表达式不同但别名相同的情况。 select_parts = dedupe_sql_parts(select_parts) if not select_parts: print(f"错误: {card_id} {card_name} 没有select字段。") return '', '', '', '' else: select_clause = "SELECT " + ",\n ".join(select_parts) # 构建WHERE filter_conditions = {} window_alias_map = {} window_select_expressions = [] try: # parse_filter_string 会顺便收集需要下推到 WITH 的窗口函数表达式。 filter_conditions = parse_filter_string(filter_relation_str, added_fields_info, window_alias_map, window_select_expressions) except Exception as e: print(f"错误: 卡片 {card_id} {card_name} 解析筛选条件出错:{e}。WHERE字句缺失。") print("详细错误信息:") print(traceback.format_exc()) # 构建WITH with_part = "" new_date_fields = [] # 日期转换 for fid, name in all_field_id_name_map.items(): fid_splits = fid.split('_') if len(fid_splits) == 2: new_date_fields.append((fid, name)) old_fid = fid_splits[0] selected_fid_alias_map[old_fid] = name # 新增维度字段 new_dimension_fields = [] for fid, name in dimension_fid_name_map.items(): if fid in added_fields_info: new_dimension_fields.append((fid, name)) # 只要存在派生日期、计算维度或窗口筛选中的任一情况,就需要 WITH。 if new_date_fields or new_dimension_fields or window_select_expressions: required_base_fields = collect_with_base_fields( all_field_names, measure_fields, new_date_fields, new_dimension_fields, dataset_fid_name_map, added_fields_info, filter_relation_str, ) with_part = build_with_part( new_date_fields, new_dimension_fields, dataset_fid_name_map, added_fields_info, dataset_id, required_base_fields, window_select_expressions, ) # 构建FROM if with_part: from_clause = "FROM tmp" else: from_clause = f"FROM {quote_identifier(str(dataset_id))}" # 构建GROUPBY group_by_clause = "" if has_aggregation: group_by_parts = [quote_identifier(field) for field in dimension_fields] group_by_parts.extend(non_aggregated_select_parts) group_by_parts = dedupe_sql_parts(group_by_parts) if group_by_parts: group_by_clause = "GROUP BY " + ", ".join(group_by_parts) # 构建ORDERBY order_by_clause = "" if sort_fields and sort_method and len(sort_fields) == len(sort_method): order_by_parts = [] for i, field in enumerate(sort_fields): fid = sort_fids[i] if fid not in selected_fid_alias_map: continue alias = selected_fid_alias_map[fid] order_by_parts.append(f"{quote_identifier(alias)} {sort_method[i]}") if order_by_parts: order_by_clause = "ORDER BY " + ", ".join(order_by_parts) # 组装SQL sql_parts = [with_part, select_clause, from_clause] # 返回 select, where, groupby, orderby return ("\n".join(sql_parts)).strip(), json.dumps(filter_conditions, ensure_ascii=False), group_by_clause, order_by_clause def generate(start=None, end=None, test_card_id=None): res_list = [] df = pd.read_csv("data/card.csv").fillna("").reset_index() add_field_info = pd.read_csv("data/calc.csv").fillna('').set_index("card_id") all_field_info = pd.read_csv("data/field.csv").fillna('').set_index("ds_id") for i, row in df.iterrows(): if start and i < start: continue if end and i > end: break card_id = row["card_id"] if test_card_id and card_id != test_card_id: continue if row["card_type_cd"] != '图表' or row["ds_id"] == "": continue try: added_fields_info = add_field_info.loc[[card_id]] except KeyError: added_fields_info = pd.DataFrame() try: dataset_fid_name_map = all_field_info.loc[[row["ds_id"]]] except KeyError: print(f"错误: 没有数据及字段信息: {card_id}") continue select, where, groupby, orderby = '', '', '', '' try: select, where, groupby, orderby = build_sql_query(row, added_fields_info, dataset_fid_name_map) except Exception as e: print(f"错误: 卡片 {card_id} 发生未知错误: {e}") print(i, traceback.format_exc()) if not select: print(f"{card_id} 生成失败") continue res_list.append([str(card_id), str(row["card_name"]), select, where, groupby, orderby]) res_df = pd.DataFrame(res_list, columns=["card_id", "card_name", "select", 'where', 'groupby', 'orderby']) return res_df if __name__ == "__main__": df = generate() df.to_parquet("output/sql.parquet") df.to_excel("output/sql.xlsx")