| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924 |
- import ast
- import json
- import re
- import sys
- import traceback
- import pandas as pd
- # 聚合函数
- AGGREGATION_MAP = {
- 'SUM': 'SUM',
- 'AVG': 'AVG',
- 'CNT': 'COUNT',
- 'MAX': 'MAX',
- 'MIN': 'MIN',
- 'CNT_DISTINCT': 'COUNT(DISTINCT {})'
- }
- AGGREGATION_SUFFIX_MAP = {
- 'SUM': '求和',
- 'AVG': '均值',
- 'CNT': '计数',
- 'MAX': '最大值',
- 'MIN': '最小值',
- 'CNT_DISTINCT': '去重计数'
- }
- # 筛选操作符
- FILTER_OPERATOR_MAP = {
- 'BT': {"val_nums": 2, 'template': "{field} BETWEEN {value_1} AND {value_2}"},
- 'CLOSE_BT_OPEN': {"val_nums": 2, 'template': "{field} >= {value_1} AND {field} < {value_2}"},
- 'EQ': {"val_nums": 1, 'template': "{field} = {value}"},
- 'GE': {"val_nums": 1, 'template': "{field} >= {value}"},
- 'GT': {"val_nums": 1, 'template': "{field} > {value}"},
- 'IN': {"val_nums": 9, 'template': "{field} IN ({values})"},
- 'IS_NULL': {"val_nums": 0, 'template': "{field} IS NULL"},
- 'LE': {"val_nums": 1, 'template': "{field} <= {value}"},
- 'LT': {"val_nums": 1, 'template': "{field} < {value}"},
- 'NE': {"val_nums": 1, 'template': "{field} != {value}"},
- 'NI': {"val_nums": 9, 'template': "{field} NOT IN ({values})"},
- 'NOT_NULL': {"val_nums": 0, 'template': "{field} IS NOT NULL"},
- 'OPEN_BT_CLOSE': {"val_nums": 2, 'template': "{field} > {value_1} AND {field} <= {value_2}"},
- 'OPEN_BT_OPEN': {"val_nums": 2, 'template': "{field} > {value_1} AND {field} < {value_2}"},
- 'CONTAINS': {"val_nums": 1, 'template': "{field} LIKE '%{value}%'"},
- 'NOT_CONTAINS': {"val_nums": 1, 'template': "{field} NOT LIKE '%{value}%'"},
- 'STARTS_WITH': {"val_nums": 1, 'template': "{field} LIKE '{value}%'"},
- 'NOT_STARTS_WITH': {"val_nums": 1, 'template': "{field} NOT LIKE '{value}%'"},
- 'ENDS_WITH': {"val_nums": 1, 'template': "{field} LIKE '%{value}'"},
- 'NOT_ENDS_WITH': {"val_nums": 1, 'template': "{field} NOT LIKE '%{value}'"},
- 'CUSTOM': 'CUSTOM',
- 'SPARK_EXPR': 'SPARK_EXPR'
- }
- # 引用
- IDENTIFIER_QUOTE = '`'
- QUOTE_FLAG = True
- WINDOW_MAX_OVER_PATTERN = re.compile(
- r"max\s*\(\s*(?P<arg>.*?)\s*\)\s*over\s*\(\s*(?P<window>.*?)\s*\)",
- flags=re.IGNORECASE | re.DOTALL,
- )
- AGGREGATION_PATTERN= re.compile(r"\b(sum|avg|count|max|min|stddev|variance|collect_list|collect_set|percentile|percentile_approx)|\s*\(", flags=re.IGNORECASE)
- # 副词
- ADV_FILTER_EXP_MAP = {
- 'TODAY': "{field} = '{{today}}'",
- 'YESTERDAY': "{field} = date_sub('{{today}}', 1)",
- 'DAY_BEFORE_YESTERDAY': "{field} = date_sub('{{today}}', 2)",
- 'LAST_14_DAY': "{field} between date_sub('{{today}}', 13) and '{{today}}'",
- 'LAST_1_YEAR': "{field} between date_sub('{{today}}', 364) and '{{today}}'",
- 'LAST_30_DAY': "{field} between date_sub('{{today}}', 29) and '{{today}}'",
- 'LAST_7_DAY': "{field} between date_sub('{{today}}', 6) and '{{today}}'",
- 'LAST_90_DAY': "{field} between date_sub('{{today}}', 89) and '{{today}}'",
- 'LAST_MONTH': "{field} between datesub(add_months('{{today}}', -1), day('{{today}}') - 1) and date_sub('{{today}}', day('{{today}}'))",
- 'LAST_WEEK': "{field} between date_sub('{{today}}', case when dayofweek('{{today}}') = 1 then 13 else dayofweek('{{today}}')+5 end) and date_sub('{{today}}', case when dayofweek('{{today}}') = 1 then 7 else dayofweek('{{today}}')-1 end)",
- 'MONTH_BEFORE_LAST_MONTH': "{field} between date_sub(add_months('{{today}}', -2), day(add_months('{{today}}', -2))-1) and date_sub(add_months('{{today}}', -1), day(add_months('{{today}}', -1)))",
- 'MONTH_TO_DAY': "{field} between date_sub('{{today}}', day('{{today}}')-1) and '{{today}}'",
- 'MONTH_TO_YESTERDAY': "{field} between date_sub('{{today}}', day('{{today}}')-1) and date_sub('{{today}}', 1)",
- 'QUARTER_TO_DAY': """{field} between concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
- when quarter('{{today}}') = 2 then '-04-01'
- when quarter('{{today}}') = 3 then '-07-01'
- when quarter('{{today}}') = 4 then '-10-01' end) and '{{today}}'""",
- 'QUARTER_TO_YESTERDAY': """{field} between concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
- when quarter('{{today}}') = 2 then '-04-01'
- when quarter('{{today}}') = 3 then '-07-01'
- when quarter('{{today}}') = 4 then '-10-01' end) and date_sub('{{today}}', 1)""",
- 'WEEK_TO_DAY': "{field} between date_sub('{{today}}', (dayofweek('{{today}}')+5)%7) and '{{today}}'",
- 'WEEK_TO_YESTERDAY': "{field} between date_sub('{{today}}', (dayofweek('{{today}}')+5)%7) and date_sub('{{today}}', 1)",
- 'YEAR_TO_DAY': "{field} between concat(year('{{today}}'), '-01-01') and '{{today}}'",
- 'YEAR_TO_LAST_MONTH': "{field} between concat(year('{{today}}'), '-01-01') and date_sub('{{today}}', day('{{today}}'))",
- 'YEAR_TO_LAST_QUARTER': """{field} between concat(year('{{today}}'), '-01-01') and date_sub(concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
- when quarter('{{today}}') = 2 then '-04-01'
- when quarter('{{today}}') = 3 then '-07-01'
- when quarter('{{today}}') = 4 then '-10-01' end), 1)""",
- 'YEAR_TO_YESTERDAY': "{field} between concat(year('{{today}}'), '-01-01') and date_sub('{{today}}', 1)",
- }
- # 自带日期转换
- PARTIAL_DATE_EXPRESSION = {
- 'day': "`{old}` as `{new}`",
- 'month': "concat(year(`{old}`), '-', lpad(month(`{old}`), 2, '0') as `{new}`",
- 'year': "year(`{old}`) as `{new}`",
- 'quarter': "concat(year(`{old}`), '-S', quarter(`{old}`)) as `{new}`",
- 'week': "weekofyear(`{old}`) as `{new}`",
- 'dayofweek': "dayofweek(`{old}`)-1 as `{new}`",
- 'hour': "hour(`{old}`)-8 as `{new}`",
- 'minute': "minute(`{old}`)-8 as `{new}`",
- }
- # 获取新增字段
- def get_added_fields_info(added_fields_df):
- if added_fields_df.empty:
- return {}
- added_fields_info = {}
- for _, row in added_fields_df.iterrows():
- try:
- content = json.loads(row['calc_field_logic'])
- except:
- print(f'ERROR: 新增字段解析错误: {row["calc_field_logic"]}')
- continue
- field_name = content['name']
- field_id = content['fdId']
- added_fields_info[field_id] = {"field_name": field_name, 'calculation': content}
- added_fields_info[field_name] = {"field_id": field_id, 'calculation': content}
- return added_fields_info
- def get_fid_name_map(field_def_df):
- field_id_name = {}
- for i, row in field_def_df.iterrows():
- field_id_name[row['field_id']] = row['field_name']
- return field_id_name
- # 字段映射关系
- def get_fields_rename_map(field_info):
- ret = {}
- try:
- tmp_map = json.loads(field_info)
- except:
- return ret
- dimensions, metrics = tmp_map.get("dimensions"), tmp_map.get("metrics")
- if dimensions and dimensions != 'null':
- for one_map in dimensions:
- ret[one_map["name"]] = one_map["alias"]
- if metrics and metrics != 'null':
- for one_map in metrics:
- ret[one_map["name"]] = one_map["alias"]
- return ret
- # 递归解析嵌套的计算字段
- def resolve_calculation_formula(formula, calculation_fields, visited=None):
- if not formula:
- return formula
- if visited is None:
- visited = set()
- def replace_calculation_field(match):
- field_key = match.group(1).strip()
- field_def = calculation_fields.get(field_key)
- if not field_def:
- return match.group(0)
- field_id = field_def.get("field_id") or field_key
- if field_id in visited:
- raise ValueError(f"计算字段存在循环引用: {field_key}")
- nested_formula = field_def["calculation"].get("formula", "")
- if "consolidation" in nested_formula:
- return match.group(0)
- resolved = resolve_calculation_formula(nested_formula, calculation_fields, visited | {field_id})
- return f"({resolved})"
- return re.sub(r"\[([^\[\]]+)\]", replace_calculation_field, formula)
- def extract_formula_field_refs(formula):
- # 提取公式中以 [字段] 形式引用的字段,供依赖收集使用。
- if not formula:
- return set()
- refs = set()
- for match in re.findall(r"\[([^\[\]]+)\]", formula):
- field_name = match.strip()
- if field_name and not re.fullmatch(r"\d+", field_name):
- refs.add(field_name)
- return refs
- def collect_formula_dependencies(formula, calculation_fields, visited=None):
- # 递归下钻计算字段,收集最终依赖到的数据集原始字段。
- if not formula:
- return set()
- if visited is None:
- visited = set()
- if "consolidation" in formula:
- consolidation_dict = json.loads(formula)["consolidation"]
- source_name = consolidation_dict.get("sourceName")
- if not source_name:
- return set()
- source_field = calculation_fields.get(source_name)
- if not source_field:
- return {source_name}
- source_field_id = source_field.get("field_id") or source_name
- if source_field_id in visited:
- raise ValueError(f"计算字段存在循环引用: {source_name}")
- nested_formula = source_field["calculation"].get("formula", "")
- return collect_formula_dependencies(nested_formula, calculation_fields, visited | {source_field_id})
- dependencies = set()
- for field_name in extract_formula_field_refs(formula):
- field_def = calculation_fields.get(field_name)
- if not field_def:
- dependencies.add(field_name)
- continue
- field_id = field_def.get("field_id") or field_name
- if field_id in visited:
- raise ValueError(f"计算字段存在循环引用: {field_name}")
- nested_formula = field_def["calculation"].get("formula", "")
- dependencies.update(collect_formula_dependencies(nested_formula, calculation_fields, visited | {field_id}))
- return dependencies
- def collect_filter_dependencies(filter_relation_str, calculation_fields):
- # 筛选条件里的公式也可能依赖额外字段,需要提前纳入 WITH 基础列。
- if not filter_relation_str or filter_relation_str == "[]":
- return set()
- dependencies = set()
- try:
- raw_conditions = json.loads(filter_relation_str)
- except Exception:
- return dependencies
- for cond_dict in raw_conditions:
- field_name = cond_dict.get("name")
- if field_name:
- dependencies.add(field_name)
- formula = cond_dict.get("formula")
- if formula:
- dependencies.update(collect_formula_dependencies(formula, calculation_fields))
- consolidation = cond_dict.get("consolidation")
- if consolidation:
- source_name = consolidation.get("sourceName")
- if source_name:
- source_field = calculation_fields.get(source_name)
- if not source_field:
- dependencies.add(source_name)
- else:
- nested_formula = source_field["calculation"].get("formula", "")
- dependencies.update(collect_formula_dependencies(nested_formula, calculation_fields, {source_field.get("field_id") or source_name}))
- return dependencies
- def collect_with_base_fields(
- all_field_names,
- measure_fields,
- new_date_fields,
- new_dimension_fields,
- dataset_fid_name_map,
- added_fields_info,
- filter_relation_str,
- ):
- # WITH 只保留后续 SELECT / WHERE / ORDER BY 真正需要的底层字段,
- # 避免把整张数据集无差别 SELECT 进临时表。
- dataset_field_names = set(dataset_fid_name_map.values())
- required_fields = {name for name in all_field_names if name in dataset_field_names}
- for fid, _ in new_date_fields:
- old_fid = fid.split('_')[0]
- if old_fid in dataset_fid_name_map:
- required_fields.add(dataset_fid_name_map[old_fid])
- elif old_fid in added_fields_info:
- formula = added_fields_info[old_fid]["calculation"].get("formula", "")
- required_fields.update(collect_formula_dependencies(formula, added_fields_info, {old_fid}))
- for fid, _ in new_dimension_fields:
- formula = added_fields_info[fid]["calculation"].get("formula", "")
- required_fields.update(collect_formula_dependencies(formula, added_fields_info, {fid}))
- for field in measure_fields:
- if field not in added_fields_info:
- continue
- field_id = added_fields_info[field]["field_id"]
- formula = added_fields_info[field]["calculation"].get("formula", "")
- required_fields.update(collect_formula_dependencies(formula, added_fields_info, {field_id}))
- required_fields.update(collect_filter_dependencies(filter_relation_str, added_fields_info))
- return required_fields
- def resolve_window_expression_fields(expression, calculation_fields):
- # 窗口函数内部若引用了计算字段,需要先还原为公式,
- # 否则 WITH 中生成的窗口列仍会依赖一个并不存在的别名字段。
- if not expression:
- return expression
- def replace_identifier(match):
- field_name = match.group(1).strip()
- field_def = calculation_fields.get(field_name)
- if not field_def:
- return match.group(0)
- field_id = field_def.get("field_id") or field_name
- formula = field_def["calculation"].get("formula", "")
- if "consolidation" in formula:
- resolved_formula = get_consolidation_field(json.loads(formula)["consolidation"])
- else:
- resolved_formula = resolve_calculation_formula(formula, calculation_fields, {field_id})
- resolved_formula = quote_identifier(resolved_formula, formula=True)
- return f"({resolved_formula})"
- return re.sub(r"`([^`]+)`", replace_identifier, expression)
- def rewrite_window_max_over(expression, calculation_fields, window_alias_map, window_select_expressions):
- # Hive / SparkSQL 不允许在 WHERE/HAVING 中直接使用窗口函数。
- # 这里将 max() over(...) 提取到 WITH 中,WHERE 里只保留对中间列的判断。
- if not expression:
- return expression
- def replace_window(match):
- raw_expression = resolve_window_expression_fields(match.group(0).strip(), calculation_fields)
- normalized_expression = re.sub(r"\s+", " ", raw_expression).lower()
- alias = window_alias_map.get(normalized_expression)
- if not alias:
- alias = f"window_max_over_{len(window_alias_map) + 1}"
- window_alias_map[normalized_expression] = alias
- window_select_expressions.append(f"{raw_expression} AS {quote_identifier(alias)}")
- return quote_identifier(alias)
- return WINDOW_MAX_OVER_PATTERN.sub(replace_window, expression)
- def build_with_part(
- new_date_fields,
- new_dimension_fields,
- dataset_fid_name_map,
- added_fields_info,
- dataset_id,
- required_base_fields,
- extra_with_expressions=None,
- ):
- override_field_names = set()
- for _, new_name in new_date_fields:
- override_field_names.add(new_name)
- for fid, _ in new_dimension_fields:
- override_field_names.add(added_fields_info[fid]["field_name"])
- base_columns = []
- seen_columns = set()
- for field_name in dataset_fid_name_map.values():
- if field_name in override_field_names or field_name in seen_columns:
- continue
- # 仅保留依赖收集阶段判定为需要的原始字段。
- if field_name not in required_base_fields:
- continue
- seen_columns.add(field_name)
- base_columns.append(quote_identifier(field_name))
- with_expressions = []
- for fid, new_name in new_date_fields:
- old_fid, partial_date = fid.split('_')
- if old_fid in dataset_fid_name_map:
- old_name = dataset_fid_name_map[old_fid]
- elif old_fid in added_fields_info:
- old_name = added_fields_info[old_fid]["calculation"]["formula"].replace("[", "").replace("]", '')
- else:
- raise ValueError(f"字段 {fid} {new_name} 不存在")
- tmp_part = PARTIAL_DATE_EXPRESSION.get(partial_date, None)
- if tmp_part:
- tmp_part = tmp_part.format(old=old_name, new=new_name)
- with_expressions.append(tmp_part)
- else:
- raise ValueError(f"日期转换方式 {partial_date} 不存在")
- for fid, new_name in new_dimension_fields:
- field_def = added_fields_info[fid]
- new_name = field_def["field_name"]
- formula = field_def["calculation"]["formula"]
- if "consolidation" in formula:
- consolidation_dict = json.loads(formula)["consolidation"]
- tmp_part = get_consolidation_field(consolidation_dict)
- tmp_part += f" AS `{new_name}`"
- else:
- # 递归解析计算字段是否有嵌套情况
- formula = resolve_calculation_formula(formula, added_fields_info, {fid})
- tmp_part = quote_identifier(formula, formula=True) + f" AS `{new_name}`"
- with_expressions.append(tmp_part)
- if extra_with_expressions:
- # 额外字段主要承载从 WHERE 中抽出的窗口函数中间列。
- with_expressions.extend(extra_with_expressions)
- select_parts = base_columns + with_expressions
- sql_part = "WITH tmp as (\nSELECT " + ",\n".join(select_parts)
- sql_part += f"\nFROM {quote_identifier(str(dataset_id))}\n)"
- return sql_part
- # 处理计算字段
- def process_measure_fields(measure_fields, measure_aggs, calculation_fields, card_id, card_name):
- ## 数值字段数量 小于 聚合函数数量,不合法
- if len(measure_fields) < len(measure_aggs):
- print(f"警告: 卡片 {card_id} {card_name}: 数值字段数量小于聚合函数数量,不合法")
- print(f"警告: 卡片 {card_id} {card_name}: 不添加任何数值字段.")
- return [], [], [], False
- ## 数值字段 大于 聚合函数数量,存在聚合类型的计算字段,尝试填充
- elif len(measure_fields) > len(measure_aggs):
- ## 计算数值字段数量
- num_cals = 0
- for field in measure_fields:
- if field in calculation_fields: # and calculation_fields[field]["calculation"]["isAggregated"] is True:
- num_cals += 1
- ## 如果不存在任何计算字段,补全剩余的NUL聚合函数
- if num_cals == 0:
- measure_aggs.extend(['NULL'] * (len(measure_fields) - len(measure_aggs)))
- return [quote_identifier(field) for field in measure_fields], measure_aggs, [False] * len(measure_fields), True
- ## 如果存在计算字段,且相加后的 聚合函数数量 仍小于 数值字段数量,不合法
- if num_cals + len(measure_aggs) != len(measure_fields):
- print(f"警告: 卡片 {card_id} {card_name}: 数值字段数量大于聚合函数数量,不合法")
- print(f"警告: 卡片 {card_id} {card_name}: 不添加任何数值字段.")
- return [], [], [], False
- ## 通过验证,填充聚合函数
- new_measure_fields, new_measure_aggs, measure_is_aggregated, agg_flag = [], [], [], False
- for i, field in enumerate(measure_fields):
- ## 非计算字段
- if field not in calculation_fields:
- new_measure_fields.append(quote_identifier(field))
- new_measure_aggs.append(measure_aggs.pop(0))
- measure_is_aggregated.append(False)
- ## 计算字段
- else:
- formula = calculation_fields[field]["calculation"]["formula"]
- formula = resolve_calculation_formula(formula, calculation_fields, {calculation_fields[field]["field_id"]})
- new_measure_fields.append(quote_identifier(formula, formula=True))
- if calculation_fields[field]["calculation"]["isAggregated"] is True:
- new_measure_aggs.append("NUL")
- measure_is_aggregated.append(True)
- agg_flag = True
- else:
- new_measure_aggs.append('NUL')
- measure_is_aggregated.append(True)
- return new_measure_fields, new_measure_aggs, measure_is_aggregated, agg_flag
- # sql部分去重
- def dedupe_sql_parts(parts):
- deduped = []
- seen = set()
- for part in parts:
- if not part or part in seen:
- continue
- seen.add(part)
- deduped.append(part)
- return deduped
- def quote_identifier(identifier, formula=False):
- if not QUOTE_FLAG:
- return identifier
- if not identifier:
- return ''
- # 简单处理,如果包含非字母数字下划线或可能是关键字,则加反引号
- # 更复杂的关键字检查可以添加
- if formula:
- params = re.findall(r"\[DYNAMIC_PARAMS\.\w+\]", identifier)
- for p in params:
- subs = p[1:-1]
- subs = "{{{"+subs+"}}}"
- identifier = identifier.replace(p, subs, 1)
- # 仅替换配置里用于包裹字段名的 [字段],保留 Hive 下标访问里的 [2] 等表达式
- def replace_bracket_identifier(match):
- content = match.group(1)
- if re.fullmatch(r"\d+", content.strip()):
- return match.group(0)
- return f"{IDENTIFIER_QUOTE}{content}{IDENTIFIER_QUOTE}"
- identifier = re.sub(r"\[([^\[\]]+)\]", replace_bracket_identifier, identifier)
- else:
- identifier = identifier.replace('\n', ' ')
- if not re.match(r'[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
- return f'{IDENTIFIER_QUOTE}{identifier}{IDENTIFIER_QUOTE}'
- return identifier
- def parse_multi_value_field(field_value):
- # 解析包含多个值的字段
- if not field_value or field_value == "":
- return []
- try:
- res = ast.literal_eval(field_value)
- except Exception:
- print(field_value)
- print(traceback.format_exc())
- return ast.literal_eval(field_value)
- # 处理过滤条件的操作符
- def get_format_args(field, fd_type, op_dict, values):
- # 按照数据类型及操作符,判断是否需要加引号
- if fd_type in ('DECIMAL', 'DOUBLE', 'INT', 'FLOAT', 'LONG', 'SHORT'):
- values = [x for x in values if x]
- elif fd_type in ('DATE', 'STRING', 'SUB_DATE', 'TIMESTAMP'):
- if op_dict.get('quote', True):
- values = [f"'{x}'" for x in values]
- elif fd_type == 'BOOL':
- values = [value.upper() for value in values]
- else:
- pass
- # 按照操作符所需参数个数构造format参数
- format_dict = {}
- value_nums = op_dict['val_nums']
- if value_nums == 9:
- format_dict.update(**{"values": ", ".join(values)})
- elif value_nums == 2:
- format_dict.update(**{"value_1": values[0], "value_2": values[1]})
- elif value_nums == 1:
- format_dict.update(**{"value": values[0]})
- else:
- pass
- format_dict["field"] = field
- return format_dict
- # 处理过滤条件中的consolidation
- def get_consolidation_field(consolidation_dict):
- field_name = quote_identifier(consolidation_dict["sourceName"])
- group_type = consolidation_dict["groupType"]
- fd_type = consolidation_dict["sourceFdType"]
- group_rules = consolidation_dict.get('groups')
- fixed_step = consolidation_dict.get('fixedStepSetting')
- when_part = []
- else_part = None
- if group_type == 'ITEM':
- for group in group_rules:
- group_name = group["groupName"]
- if group.get('isOtherGroup', False):
- else_part = f"ELSE '{group_name}'"
- else:
- selected_values = group.get('selectedValues', [])
- when_value = str(selected_values)
- when_value = when_value[1:-1] # 去除中括号
- if when_value is None:
- when_part.append(f"WHEN {field_name} IS NULL THEN '{group_name}'")
- else:
- when_part.append(f"WHEN {field_name} IN ({when_value}) THEN '{group_name}'")
- elif group_type == "CONDITION":
- for group in group_rules:
- group_name = group["groupName"]
- if group.get('isOtherGroup', False):
- else_part = f"ELSE '{group_name}'"
- else:
- rules = group["rules"]
- cond_list = []
- combine_type = " " + rules["combineType"] + " " # 加空格以便join
- for cond in rules["conditions"]:
- filter_type = cond["filterType"]
- filter_value = cond["filterValue"]
- op_dict = FILTER_OPERATOR_MAP[filter_type]
- format_args = get_format_args(field_name, fd_type, op_dict, [filter_value])
- cond_str = op_dict["template"].format(**format_args)
- cond_list.append(cond_str)
- when_str = "WHEN " + combine_type.join(cond_list) + f" THEN '{group_name}'"
- when_part.append(when_str)
- elif group_type == "CUSTOM_STEP":
- for group in group_rules:
- group_name = group["groupName"]
- if group.get('isOtherGroup', False):
- else_part = f"ELSE '{group_name}'"
- else:
- setting = group['customStepSetting']
- operator = setting['operator']
- start = setting['startValue']
- end = setting['endValue']
- condition = ''
- if operator == 'BT':
- condition = f"{field_name} BETWEEN {start} AND {end}"
- elif operator == 'OPEN_BT_CLOSE':
- condition = f"{field_name} > {start} AND {field_name} <= {end}"
- elif operator == 'OPEN_BT_OPEN':
- condition = f"{field_name} > {start} AND {field_name} < {end}"
- elif operator == 'CLOSE_BT_OPEN':
- condition = f"{field_name} >= {start} AND {field_name} < {end}"
- else:
- raise ValueError(f"未知的操作符: {operator}")
- when_part.append(f"WHEN {condition} THEN '{group_name}'")
- elif group_type == "FIXED_STEP":
- start = fixed_step['startValue']
- end = fixed_step['endValue']
- step = fixed_step['stepSize']
- # 生成每个区间的case when部分
- lower = start
- while lower < end:
- upper = lower + step
- case_part = f"WHEN {field_name} >= {lower} AND {field_name} < {upper} THEN '{lower}-{upper}'"
- when_part.append(case_part)
- lower = upper
- # 处理最后一个区间
- case_part = f"WHEN {field_name} >= {lower} AND {field_name} <= {end} THEN '{lower}-{end}'"
- when_part.append(case_part)
- else_part = "ELSE NULL"
- else:
- raise ValueError(f"未知的groupType: {group_type}")
- field = "CASE "
- field += "\n".join(when_part)
- if else_part:
- field += f"\n{else_part}"
- field += "\nEND"
- return field
- def parse_filter_string(filter_relation_str, calculation_fields=None, window_alias_map=None, window_select_expressions=None):
- conditions = {}
- if not filter_relation_str or filter_relation_str == "[]":
- return conditions
- if calculation_fields is None:
- calculation_fields = {}
- if window_alias_map is None:
- window_alias_map = {}
- if window_select_expressions is None:
- window_select_expressions = []
- raw_conditions = json.loads(filter_relation_str)
- for cond_dict in raw_conditions:
- fdId = cond_dict.get("fdId")
- field = cond_dict.get("name")
- fd_type = cond_dict.get("fdType")
- op_name = cond_dict.get("filterType")
- op_dict = FILTER_OPERATOR_MAP.get(op_name)
- values = cond_dict.get("filterValue") # list
- is_aggregated = cond_dict.get("isAggregated", False)
- # 检查条件合法
- if any([fdId is None, field is None, fd_type is None, op_name is None, values is None]):
- print(f"fdId: {fdId} field: {field} fd_type: {fd_type} op_name: {op_name} values: {values}")
- print(f"警告: 无法解析筛选条件,缺少必须字段,跳过此条件。")
- continue
- if op_dict is None:
- print(f"警告: 无法解析筛选条件,未定义的筛选类型: {op_name},跳过此条件。")
- continue
- # 特殊操作符
- if op_dict == 'CUSTOM':
- if "advFilter" not in cond_dict:
- print(f"警告: CUSTOM筛选类型不存在advFilter, 跳过此条件。")
- continue
- if 'formula' in cond_dict:
- field = quote_identifier(cond_dict['formula'], formula=True)
- # 先改写窗口函数,避免将非法的 over(...) 留在 WHERE 条件中。
- field = rewrite_window_max_over(field, calculation_fields, window_alias_map, window_select_expressions)
- else:
- field = quote_identifier(cond_dict['name'])
- expression = ADV_FILTER_EXP_MAP.get(cond_dict["advFilter"])
- if not expression:
- print(f"警告: CUSTOM筛选类型出现未定义的advFilter: {cond_dict['advFilter']}, 跳过此条件。")
- continue
- expression = expression.format(field=field)
- conditions[fdId] = {"exp": expression, "agg": is_aggregated}
- continue
- elif op_dict == 'SPARK_EXPR':
- if 'formula' in cond_dict:
- formula = quote_identifier(cond_dict['formula'], formula=True)
- # SPARK_EXPR 中也可能直接出现窗口函数,处理方式与普通公式一致。
- formula = rewrite_window_max_over(formula, calculation_fields, window_alias_map, window_select_expressions)
- conditions[fdId] = {"exp": formula, "agg": is_aggregated}
- else:
- if isinstance(cond_dict['filterValue'], list) and len(cond_dict['filterValue']) == 1:
- field = quote_identifier(cond_dict['name'])
- value = cond_dict['filterValue'][0]
- conditions[fdId] = {"exp": f"{field} = {value}", "agg": is_aggregated}
- else:
- print(f"警告: 无法解析筛选条件,SPARK_EXPR中未定义。跳过此条件。")
- continue
- # 处理条件
- value_nums = op_dict["val_nums"]
- if value_nums != 9 and len(values) != value_nums:
- print(f"警告: 无法解析筛选条件,值数量与操作符不匹配。跳过此条件。")
- continue
- field = quote_identifier(field)
- # consolidation 情况,将consolidation公式替换条件左边的field
- if "consolidation" in cond_dict:
- consolidation = cond_dict["consolidation"]
- consolidation_field = get_consolidation_field(consolidation)
- if not consolidation_field:
- print(f"警告: 无法解析consolidation字段。跳过此条件。")
- continue
- else:
- field = consolidation_field
- else:
- # 公式,非 consolidation情况
- if "formula" in cond_dict:
- field = quote_identifier(cond_dict["formula"], formula=True)
- field = rewrite_window_max_over(field, calculation_fields, window_alias_map, window_select_expressions)
-
- if op_name in ("NI", "IN") and len(values) == 0:
- print(f"警告: 无法解析筛选条件,IN或NI中参数个数为0。跳过此条件。")
- continue
-
- # 特殊情况
- if op_name in ('NI', 'IN') and None in values:
- conditions[fdId] = {"exp": f"{field} IS NOT NULL", "agg": is_aggregated}
- values = [x for x in values if x is not None]
- if len(values) == 0:
- continue
-
- # 填充模板所需要的参数
- format_args = get_format_args(field, fd_type, op_dict, values)
- condition_str = op_dict["template"].format(**format_args)
- conditions[fdId] = {"exp": condition_str, "agg": is_aggregated}
- return conditions
- def build_sql_query(card_data, added_fields_info, dataset_fid_name_map):
- card_id = card_data["card_id"]
- card_name = card_data["card_name"]
- dataset_id = card_data.get("ds_id")
- if not dataset_id:
- print(f"错误: {card_id} {card_name} 数据集ID为空.")
- return "", "", "", ""
-
- added_fields_info = get_added_fields_info(added_fields_info)
- dataset_fid_name_map = get_fid_name_map(dataset_fid_name_map)
- dimension_fids = parse_multi_value_field(card_data.get("field_id", []))
- dimension_fields = parse_multi_value_field(card_data.get("field_name", []))
- dimension_fid_name_map = dict(zip(dimension_fids, dimension_fields))
- dimension_name_fid_map = dict(zip(dimension_fields, dimension_fids))
- measure_fids = parse_multi_value_field(card_data.get("num_value_field_id", []))
- measure_fields = parse_multi_value_field(card_data.get("num_value_field_name", []))
- # 处理用于转置行列的特殊无ID“度量名”字段
- if "度量名" in dimension_fields and len(dimension_fields) == len(dimension_fids) + 1:
- dimension_fields.remove("度量名")
- measure_aggs = parse_multi_value_field(card_data.get("num_value_field_merge_way", []))
- filter_relation_str = card_data.get("filters_field_value_name_rela")
- sort_fids = parse_multi_value_field(card_data.get("sort_field_id", []))
- sort_fields = parse_multi_value_field(card_data.get("sort_field_name", []))
- sort_method = parse_multi_value_field(card_data.get("sort_way", []))
- all_field_ids = dimension_fids + \
- parse_multi_value_field(card_data.get("filters_field_id", [])) + \
- sort_fids + \
- measure_fids
- all_field_names = dimension_fields + \
- parse_multi_value_field(card_data.get("filters_field_name", [])) + \
- sort_fields + \
- measure_fields
- all_field_id_name_map = dict(zip(all_field_ids, all_field_names))
- # 处理字段重命名关系
- fields_rename_map = get_fields_rename_map(card_data.get("field_info", ""))
- # 处理field_id与重命名关系,用于筛选Order by子句中的字段
- # 需要处理的只有日期转换类型,将转换前的原始字段名加入map
- # 只需要更新有重命名的字段即可
- selected_fid_alias_map = dict(zip(dimension_fids+measure_fids, dimension_fields+measure_fields))
- # 构建SELECT
- select_parts = []
- has_aggregation = False
- non_aggregated_select_parts = []
-
- # 添加维度字段
- for field in dimension_fields:
- fid = dimension_name_fid_map[field]
- alias = fields_rename_map.get(field)
- if alias and alias != "null":
- select_parts.append(f"{quote_identifier(field)} AS {quote_identifier(alias)}")
- selected_fid_alias_map[fid] = alias
- else:
- select_parts.append(f"{quote_identifier(field)}")
- selected_fid_alias_map[fid] = field
-
- # 加工计算字段
- new_measure_fields, measure_aggs, measure_is_aggregated, agg_flag = process_measure_fields(measure_fields, measure_aggs, added_fields_info, card_id, card_name)
- if agg_flag:
- has_aggregation = True
- for i, field in enumerate(new_measure_fields):
- fid = measure_fids[i]
- alias = fields_rename_map.get(field.strip('`'))
- # measure_agg是NUL,不需要聚合(等同于维度字段)或公式本身已经有聚合函数
- agg_func_template = AGGREGATION_MAP.get(measure_aggs[i])
- if not agg_func_template:
- if not alias or alias == "null":
- alias = measure_fields[i]
- select_parts.append(f"{field} AS {quote_identifier(alias)}")
- # 属于计算字段,但没有聚合函数,等同于维度字段,需要加入groupbyby。
- if not measure_is_aggregated[i] and field and re.search(AGGREGATION_PATTERN, field) is None:
- if re.match(r"\d+", field):
- non_aggregated_select_parts.append(quote_identifier(field))
- else:
- non_aggregated_select_parts.append(field)
- non_aggregated_select_parts.append(field)
- selected_fid_alias_map[fid] = alias
- else:
- has_aggregation = True
- # 特殊处理 count distinct
- if '{}' in agg_func_template:
- agg_expression = agg_func_template.format(field)
- else:
- agg_expression = f"{agg_func_template}({field})"
- # 添加别名
- if not alias or alias == "null":
- suffix = AGGREGATION_SUFFIX_MAP.get(measure_aggs[i])
- alias = f"{measure_fields[i]}_{suffix}"
- select_parts.append(f"{agg_expression} AS {quote_identifier(alias)}")
- selected_fid_alias_map[fid] = alias
-
- # BI 卡片配置里可能存在重复字段。这里只对完全相同的 SELECT 表达式去重,保留表达式不同但别名相同的情况。
- select_parts = dedupe_sql_parts(select_parts)
- if not select_parts:
- print(f"错误: {card_id} {card_name} 没有select字段。")
- return '', '', '', ''
- else:
- select_clause = "SELECT " + ",\n ".join(select_parts)
-
- # 构建WHERE
- filter_conditions = {}
- window_alias_map = {}
- window_select_expressions = []
- try:
- # parse_filter_string 会顺便收集需要下推到 WITH 的窗口函数表达式。
- filter_conditions = parse_filter_string(filter_relation_str, added_fields_info, window_alias_map, window_select_expressions)
- except Exception as e:
- print(f"错误: 卡片 {card_id} {card_name} 解析筛选条件出错:{e}。WHERE字句缺失。")
- print("详细错误信息:")
- print(traceback.format_exc())
- # 构建WITH
- with_part = ""
- new_date_fields = []
- # 日期转换
- for fid, name in all_field_id_name_map.items():
- fid_splits = fid.split('_')
- if len(fid_splits) == 2:
- new_date_fields.append((fid, name))
- old_fid = fid_splits[0]
- selected_fid_alias_map[old_fid] = name
- # 新增维度字段
- new_dimension_fields = []
- for fid, name in dimension_fid_name_map.items():
- if fid in added_fields_info:
- new_dimension_fields.append((fid, name))
- # 只要存在派生日期、计算维度或窗口筛选中的任一情况,就需要 WITH。
- if new_date_fields or new_dimension_fields or window_select_expressions:
- required_base_fields = collect_with_base_fields(
- all_field_names,
- measure_fields,
- new_date_fields,
- new_dimension_fields,
- dataset_fid_name_map,
- added_fields_info,
- filter_relation_str,
- )
- with_part = build_with_part(
- new_date_fields,
- new_dimension_fields,
- dataset_fid_name_map,
- added_fields_info,
- dataset_id,
- required_base_fields,
- window_select_expressions,
- )
- # 构建FROM
- if with_part:
- from_clause = "FROM tmp"
- else:
- from_clause = f"FROM {quote_identifier(str(dataset_id))}"
- # 构建GROUPBY
- group_by_clause = ""
- if has_aggregation:
- group_by_parts = [quote_identifier(field) for field in dimension_fields]
- group_by_parts.extend(non_aggregated_select_parts)
- group_by_parts = dedupe_sql_parts(group_by_parts)
- if group_by_parts:
- group_by_clause = "GROUP BY " + ", ".join(group_by_parts)
-
- # 构建ORDERBY
- order_by_clause = ""
- if sort_fields and sort_method and len(sort_fields) == len(sort_method):
- order_by_parts = []
- for i, field in enumerate(sort_fields):
- fid = sort_fids[i]
- if fid not in selected_fid_alias_map:
- continue
- alias = selected_fid_alias_map[fid]
- order_by_parts.append(f"{quote_identifier(alias)} {sort_method[i]}")
- if order_by_parts:
- order_by_clause = "ORDER BY " + ", ".join(order_by_parts)
-
- # 组装SQL
- sql_parts = [with_part, select_clause, from_clause]
- # 返回 select, where, groupby, orderby
- return ("\n".join(sql_parts)).strip(), json.dumps(filter_conditions, ensure_ascii=False), group_by_clause, order_by_clause
- def generate(start=None, end=None, test_card_id=None):
- res_list = []
- df = pd.read_csv("data/card.csv").fillna("").reset_index()
- add_field_info = pd.read_csv("data/calc.csv").fillna('').set_index("card_id")
- all_field_info = pd.read_csv("data/field.csv").fillna('').set_index("ds_id")
- for i, row in df.iterrows():
- if start and i < start:
- continue
- if end and i > end:
- break
- card_id = row["card_id"]
- if test_card_id and card_id != test_card_id:
- continue
- if row["card_type_cd"] != '图表' or row["ds_id"] == "":
- continue
- try:
- added_fields_info = add_field_info.loc[[card_id]]
- except KeyError:
- added_fields_info = pd.DataFrame()
- try:
- dataset_fid_name_map = all_field_info.loc[[row["ds_id"]]]
- except KeyError:
- print(f"错误: 没有数据及字段信息: {card_id}")
- continue
- select, where, groupby, orderby = '', '', '', ''
- try:
- select, where, groupby, orderby = build_sql_query(row, added_fields_info, dataset_fid_name_map)
- except Exception as e:
- print(f"错误: 卡片 {card_id} 发生未知错误: {e}")
- print(i, traceback.format_exc())
- if not select:
- print(f"{card_id} 生成失败")
- continue
- res_list.append([str(card_id), str(row["card_name"]), select, where, groupby, orderby])
- res_df = pd.DataFrame(res_list, columns=["card_id", "card_name", "select", 'where', 'groupby', 'orderby'])
- return res_df
- if __name__ == "__main__":
- df = generate()
- df.to_parquet("output/sql.parquet")
- df.to_excel("output/sql.xlsx")
|