carddef2sql.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. import ast
  2. import json
  3. import re
  4. import sys
  5. import traceback
  6. import pandas as pd
  7. # 聚合函数
  8. AGGREGATION_MAP = {
  9. 'SUM': 'SUM',
  10. 'AVG': 'AVG',
  11. 'CNT': 'COUNT',
  12. 'MAX': 'MAX',
  13. 'MIN': 'MIN',
  14. 'CNT_DISTINCT': 'COUNT(DISTINCT {})'
  15. }
  16. AGGREGATION_SUFFIX_MAP = {
  17. 'SUM': '求和',
  18. 'AVG': '均值',
  19. 'CNT': '计数',
  20. 'MAX': '最大值',
  21. 'MIN': '最小值',
  22. 'CNT_DISTINCT': '去重计数'
  23. }
  24. # 筛选操作符
  25. FILTER_OPERATOR_MAP = {
  26. 'BT': {"val_nums": 2, 'template': "{field} BETWEEN {value_1} AND {value_2}"},
  27. 'CLOSE_BT_OPEN': {"val_nums": 2, 'template': "{field} >= {value_1} AND {field} < {value_2}"},
  28. 'EQ': {"val_nums": 1, 'template': "{field} = {value}"},
  29. 'GE': {"val_nums": 1, 'template': "{field} >= {value}"},
  30. 'GT': {"val_nums": 1, 'template': "{field} > {value}"},
  31. 'IN': {"val_nums": 9, 'template': "{field} IN ({values})"},
  32. 'IS_NULL': {"val_nums": 0, 'template': "{field} IS NULL"},
  33. 'LE': {"val_nums": 1, 'template': "{field} <= {value}"},
  34. 'LT': {"val_nums": 1, 'template': "{field} < {value}"},
  35. 'NE': {"val_nums": 1, 'template': "{field} != {value}"},
  36. 'NI': {"val_nums": 9, 'template': "{field} NOT IN ({values})"},
  37. 'NOT_NULL': {"val_nums": 0, 'template': "{field} IS NOT NULL"},
  38. 'OPEN_BT_CLOSE': {"val_nums": 2, 'template': "{field} > {value_1} AND {field} <= {value_2}"},
  39. 'OPEN_BT_OPEN': {"val_nums": 2, 'template': "{field} > {value_1} AND {field} < {value_2}"},
  40. 'CONTAINS': {"val_nums": 1, 'template': "{field} LIKE '%{value}%'"},
  41. 'NOT_CONTAINS': {"val_nums": 1, 'template': "{field} NOT LIKE '%{value}%'"},
  42. 'STARTS_WITH': {"val_nums": 1, 'template': "{field} LIKE '{value}%'"},
  43. 'NOT_STARTS_WITH': {"val_nums": 1, 'template': "{field} NOT LIKE '{value}%'"},
  44. 'ENDS_WITH': {"val_nums": 1, 'template': "{field} LIKE '%{value}'"},
  45. 'NOT_ENDS_WITH': {"val_nums": 1, 'template': "{field} NOT LIKE '%{value}'"},
  46. 'CUSTOM': 'CUSTOM',
  47. 'SPARK_EXPR': 'SPARK_EXPR'
  48. }
  49. # 引用
  50. IDENTIFIER_QUOTE = '`'
  51. QUOTE_FLAG = True
  52. # 副词
  53. ADV_FILTER_EXP_MAP = {
  54. 'TODAY': "{field} = '{{today}}'",
  55. 'YESTERDAY': "{field} = date_sub('{{today}}', 1)",
  56. 'DAY_BEFORE_YESTERDAY': "{field} = date_sub('{{today}}', 2)",
  57. 'LAST_14_DAY': "{field} between date_sub('{{today}}', 13) and '{{today}}'",
  58. 'LAST_1_YEAR': "{field} between date_sub('{{today}}', 364) and '{{today}}'",
  59. 'LAST_30_DAY': "{field} between date_sub('{{today}}', 29) and '{{today}}'",
  60. 'LAST_7_DAY': "{field} between date_sub('{{today}}', 6) and '{{today}}'",
  61. 'LAST_90_DAY': "{field} between date_sub('{{today}}', 89) and '{{today}}'",
  62. 'LAST_MONTH': "{field} between datesub(add_months('{{today}}', -1), day('{{today}}') - 1) and date_sub('{{today}}', day('{{today}}'))",
  63. 'LAST_WEEK': "{field} between date_sub('{{today}}', case when dayofweek('{{today}}') = 1 then 13 else dayofweek('{{today}}')+5 end) and date_sub('{{today}}', case when dayofweek('{{today}}') = 1 then 7 else dayofweek('{{today}}')-1 end)",
  64. 'MONTH_BEFORE_LAST_MONTH': "{field} between date_sub(add_months('{{today}}', -2), day(add_months('{{today}}', -2))-1) and date_sub(add_months('{{today}}', -1), day(add_months('{{today}}', -1)))",
  65. 'MONTH_TO_DAY': "{field} between date_sub('{{today}}', day('{{today}}')-1) and '{{today}}'",
  66. 'MONTH_TO_YESTERDAY': "{field} between date_sub('{{today}}', day('{{today}}')-1) and date_sub('{{today}}', 1)",
  67. 'QUARTER_TO_DAY': """{field} between concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
  68. when quarter('{{today}}') = 2 then '-04-01'
  69. when quarter('{{today}}') = 3 then '-07-01'
  70. when quarter('{{today}}') = 4 then '-10-01' end) and '{{today}}'""",
  71. 'QUARTER_TO_YESTERDAY': """{field} between concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
  72. when quarter('{{today}}') = 2 then '-04-01'
  73. when quarter('{{today}}') = 3 then '-07-01'
  74. when quarter('{{today}}') = 4 then '-10-01' end) and date_sub('{{today}}', 1)""",
  75. 'WEEK_TO_DAY': "{field} between date_sub('{{today}}', (dayofweek('{{today}}')+5)%7) and '{{today}}'",
  76. 'WEEK_TO_YESTERDAY': "{field} between date_sub('{{today}}', (dayofweek('{{today}}')+5)%7) and date_sub('{{today}}', 1)",
  77. 'YEAR_TO_DAY': "{field} between concat(year('{{today}}'), '-01-01') and '{{today}}'",
  78. 'YEAR_TO_LAST_MONTH': "{field} between concat(year('{{today}}'), '-01-01') and date_sub('{{today}}', day('{{today}}'))",
  79. 'YEAR_TO_LAST_QUARTER': """{field} between concat(year('{{today}}'), '-01-01') and date_sub(concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
  80. when quarter('{{today}}') = 2 then '-04-01'
  81. when quarter('{{today}}') = 3 then '-07-01'
  82. when quarter('{{today}}') = 4 then '-10-01' end), 1)""",
  83. 'YEAR_TO_YESTERDAY': "{field} between concat(year('{{today}}'), '-01-01') and date_sub('{{today}}', 1)",
  84. }
  85. # 自带日期转换
  86. PARTIAL_DATE_EXPRESSION = {
  87. 'day': "`{old}` as `{new}`",
  88. 'month': "concat(year(`{old}`), '-', lpad(month(`{old}`), 2, '0') as `{new}`",
  89. 'year': "year(`{old}`) as `{new}`",
  90. 'quarter': "concat(year(`{old}`), '-S', quarter(`{old}`)) as `{new}`",
  91. 'week': "weekofyear(`{old}`) as `{new}`",
  92. 'dayofweek': "dayofweek(`{old}`)-1 as `{new}`",
  93. 'hour': "hour(`{old}`)-8 as `{new}`",
  94. 'minute': "minute(`{old}`)-8 as `{new}`",
  95. }
  96. # 获取新增字段
  97. def get_added_fields_info(added_fields_df):
  98. if added_fields_df.empty:
  99. return {}
  100. added_fields_info = {}
  101. for _, row in added_fields_df.iterrows():
  102. try:
  103. content = json.loads(row['calc_field_logic'])
  104. except:
  105. print(f'ERROR: 新增字段解析错误: {row["calc_field_logic"]}')
  106. continue
  107. field_name = content['name']
  108. field_id = content['fdId']
  109. added_fields_info[field_id] = {"field_name": field_name, 'calculation': content}
  110. added_fields_info[field_name] = {"field_id": field_id, 'calculation': content}
  111. return added_fields_info
  112. def get_fid_name_map(field_def_df):
  113. field_id_name = {}
  114. for i, row in field_def_df.iterrows():
  115. field_id_name[row['field_id']] = row['field_name']
  116. return field_id_name
  117. # 字段映射关系
  118. def get_fields_rename_map(field_info):
  119. ret = {}
  120. try:
  121. tmp_map = json.loads(field_info)
  122. except:
  123. return ret
  124. dimensions, metrics = tmp_map.get("dimensions"), tmp_map.get("metrics")
  125. if dimensions and dimensions != 'null':
  126. for one_map in dimensions:
  127. ret[one_map["name"]] = one_map["alias"]
  128. if metrics and metrics != 'null':
  129. for one_map in metrics:
  130. ret[one_map["name"]] = one_map["alias"]
  131. return ret
  132. # 处理过滤条件中的consolidation
  133. def get_consolidation_field(consolidation_dict):
  134. field_name = quote_identifier(consolidation_dict["sourceName"])
  135. group_type = consolidation_dict["groupType"]
  136. fd_type = consolidation_dict["sourceFdType"]
  137. group_rules = consolidation_dict.get('groups')
  138. fixed_step = consolidation_dict.get('fixedStepSetting')
  139. when_part = []
  140. else_part = None
  141. if group_type == 'ITEM':
  142. for group in group_rules:
  143. group_name = group["groupName"]
  144. if group.get('isOtherGroup', False):
  145. else_part = f"ELSE '{group_name}'"
  146. else:
  147. selected_values = group.get('selectedValues', [])
  148. when_value = str(selected_values)
  149. when_value = when_value[1:-1] # 去除中括号
  150. if when_value is None:
  151. when_part.append(f"WHEN {field_name} IS NULL THEN '{group_name}'")
  152. else:
  153. when_part.append(f"WHEN {field_name} IN ({when_value}) THEN '{group_name}'")
  154. elif group_type == "CONDITION":
  155. for group in group_rules:
  156. group_name = group["groupName"]
  157. if group.get('isOtherGroup', False):
  158. else_part = f"ELSE '{group_name}'"
  159. else:
  160. rules = group["rules"]
  161. cond_list = []
  162. combine_type = " " + rules["combineType"] + " " # 加空格以便join
  163. for cond in rules["conditions"]:
  164. filter_type = cond["filterType"]
  165. filter_value = cond["filterValue"]
  166. op_dict = FILTER_OPERATOR_MAP[filter_type]
  167. format_args = get_format_args(field_name, fd_type, op_dict, [filter_value])
  168. cond_str = op_dict["template"].format(**format_args)
  169. cond_list.append(cond_str)
  170. when_str = "WHEN " + combine_type.join(cond_list) + f" THEN '{group_name}'"
  171. when_part.append(when_str)
  172. elif group_type == "CUSTOM_STEP":
  173. for group in group_rules:
  174. group_name = group["groupName"]
  175. if group.get('isOtherGroup', False):
  176. else_part = f"ELSE '{group_name}'"
  177. else:
  178. setting = group['customStepSetting']
  179. operator = setting['operator']
  180. start = setting['startValue']
  181. end = setting['endValue']
  182. condition = ''
  183. if operator == 'BT':
  184. condition = f"{field_name} BETWEEN {start} AND {end}"
  185. elif operator == 'OPEN_BT_CLOSE':
  186. condition = f"{field_name} > {start} AND {field_name} <= {end}"
  187. elif operator == 'OPEN_BT_OPEN':
  188. condition = f"{field_name} > {start} AND {field_name} < {end}"
  189. elif operator == 'CLOSE_BT_OPEN':
  190. condition = f"{field_name} >= {start} AND {field_name} < {end}"
  191. else:
  192. raise ValueError(f"未知的操作符: {operator}")
  193. when_part.append(f"WHEN {condition} THEN '{group_name}'")
  194. elif group_type == "FIXED_STEP":
  195. start = fixed_step['startValue']
  196. end = fixed_step['endValue']
  197. step = fixed_step['stepSize']
  198. # 生成每个区间的case when部分
  199. lower = start
  200. while lower < end:
  201. upper = lower + step
  202. case_part = f"WHEN {field_name} >= {lower} AND {field_name} < {upper} THEN '{lower}-{upper}'"
  203. when_part.append(case_part)
  204. lower = upper
  205. # 处理最后一个区间
  206. case_part = f"WHEN {field_name} >= {lower} AND {field_name} <= {end} THEN '{lower}-{end}'"
  207. when_part.append(case_part)
  208. else_part = "ELSE NULL"
  209. else:
  210. raise ValueError(f"未知的groupType: {group_type}")
  211. field = "CASE "
  212. field += "\n".join(when_part)
  213. if else_part:
  214. field += f"\n{else_part}"
  215. field += "\nEND"
  216. return field
  217. def build_with_part(new_date_fields, new_dimension_fields, dataset_fid_name_map, added_fields_info, dataset_id):
  218. sql_part = 'WITH tmp as (\nSELECT *,\n'
  219. with_expressions = []
  220. for fid, new_name in new_date_fields:
  221. old_fid, partial_date = fid.split('_')
  222. if old_fid in dataset_fid_name_map:
  223. old_name = dataset_fid_name_map[old_fid]
  224. elif old_fid in added_fields_info:
  225. old_name = added_fields_info[old_fid]["calculation"]["formula"].replace("[]", "").replace("]", '')
  226. else:
  227. raise ValueError(f"字段 {fid} {new_name} 不存在")
  228. tmp_part = PARTIAL_DATE_EXPRESSION.get(partial_date, None)
  229. if tmp_part:
  230. tmp_part = tmp_part.format(old=old_name, new=new_name)
  231. with_expressions.append(tmp_part)
  232. else:
  233. raise ValueError(f"日期转换方式 {partial_date} 不存在")
  234. for fid, new_name in new_dimension_fields:
  235. field_def = added_fields_info[fid]
  236. new_name = field_def["field_name"]
  237. formula = field_def["calculation"]["formula"]
  238. if "consolidation" in formula:
  239. consolidation_dict = json.loads(formula)["consolidation"]
  240. tmp_part = get_consolidation_field(consolidation_dict)
  241. tmp_part += f" AS `{new_name}`"
  242. else:
  243. tmp_part = quote_identifier(formula, formula=True) + f" AS `{new_name}`"
  244. with_expressions.append(tmp_part)
  245. sql_part += ',\n'.join(with_expressions)
  246. sql_part += f"\nFROM `{dataset_id}\n`"
  247. return sql_part
  248. # 处理计算字段
  249. def process_calculation_fields(measure_fields, measure_aggs, calculation_fields, card_id, card_name):
  250. ## 数值字段数量 小于 聚合函数数量,不合法
  251. if len(measure_fields) < len(measure_aggs):
  252. print(f"警告: 卡片 {card_id} {card_name}: 数值字段数量小于聚合函数数量,不合法")
  253. print(f"警告: 卡片 {card_id} {card_name}: 不添加任何数值字段.")
  254. return [], [], False
  255. ## 数值字段 大于 聚合函数数量,存在聚合类型的计算字段,尝试填充
  256. elif len(measure_fields) > len(measure_aggs):
  257. ## 计算数值字段数量
  258. num_cals = 0
  259. for field in measure_fields:
  260. if field in calculation_fields and calculation_fields[field]["calculation"]["isAggregated"] is True:
  261. num_cals += 1
  262. ## 如果不存在任何计算字段,补全剩余的NUL聚合函数
  263. if num_cals == 0:
  264. measure_aggs.extend(['NULL'] * (len(measure_fields) - len(measure_aggs)))
  265. return measure_fields, measure_aggs, True
  266. ## 如果存在计算字段,且相加后的 聚合函数数量 仍小于 数值字段数量,不合法
  267. if num_cals + len(measure_aggs) != len(measure_fields):
  268. print(f"警告: 卡片 {card_id} {card_name}: 数值字段数量大于聚合函数数量,不合法")
  269. print(f"警告: 卡片 {card_id} {card_name}: 不添加任何数值字段.")
  270. return [], [], False
  271. ## 通过验证,填充聚合函数
  272. new_measure_fields, new_measure_aggs, agg_flag = [], [], False
  273. for i, field in enumerate(measure_fields):
  274. ## 非计算字段
  275. if field not in calculation_fields:
  276. new_measure_fields.append(quote_identifier(field))
  277. new_measure_aggs.append(measure_aggs.pop(0))
  278. ## 计算字段
  279. else:
  280. formula = calculation_fields[field]["calculation"]["formula"]
  281. formula = formula.replace('\n', '')
  282. new_measure_fields.append(quote_identifier(formula, formula=True))
  283. if calculation_fields[field]["calculation"]["isAggregated"] is True:
  284. new_measure_aggs.append("NUL")
  285. agg_flag = True
  286. else:
  287. new_measure_aggs.append(measure_aggs.pop(0))
  288. return new_measure_fields, new_measure_aggs, agg_flag
  289. def quote_identifier(identifier, formula=False):
  290. if not QUOTE_FLAG:
  291. return identifier
  292. if not identifier:
  293. return ''
  294. # 简单处理,如果包含非字母数字下划线或可能是关键字,则加反引号
  295. # 更复杂的关键字检查可以添加
  296. if formula:
  297. params = re.findall(r"\[DYNAMIC_PARAMS\.\w+\]", identifier)
  298. for p in params:
  299. subs = p[1:-1]
  300. subs = "{{{"+subs+"}}}"
  301. identifier = identifier.replace(p, subs, 1)
  302. identifier = identifier.replace('[', IDENTIFIER_QUOTE).replace(']', IDENTIFIER_QUOTE)
  303. else:
  304. identifier = identifier.replace('\n', ' ')
  305. if not re.match(r'[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
  306. return f'{IDENTIFIER_QUOTE}{identifier}{IDENTIFIER_QUOTE}'
  307. return identifier
  308. def parse_multi_value_field(field_value):
  309. # 解析包含多个值的字段
  310. if not field_value or field_value == "":
  311. return []
  312. try:
  313. res = ast.literal_eval(field_value)
  314. except Exception:
  315. print(field_value)
  316. print(traceback.format_exc())
  317. return ast.literal_eval(field_value)
  318. # 处理过滤条件的操作符
  319. def get_format_args(field, fd_type, op_dict, values):
  320. # 按照数据类型及操作符,判断是否需要加引号
  321. if fd_type in ('DECIMAL', 'DOUBLE', 'INT', 'FLOAT', 'LONG', 'SHORT'):
  322. values = [x for x in values if x]
  323. elif fd_type in ('DATE', 'STRING', 'SUB_DATE', 'TIMESTAMP'):
  324. if op_dict.get('quote', True):
  325. values = [f"'{x}'" for x in values]
  326. elif fd_type == 'BOOL':
  327. values = [value.upper() for value in values]
  328. else:
  329. pass
  330. # 按照操作符所需参数个数构造format参数
  331. format_dict = {}
  332. value_nums = op_dict['val_nums']
  333. if value_nums == 9:
  334. format_dict.update(**{"values": ", ".join(values)})
  335. elif value_nums == 2:
  336. format_dict.update(**{"value_1": values[0], "value_2": values[1]})
  337. elif value_nums == 1:
  338. format_dict.update(**{"value": values[0]})
  339. else:
  340. pass
  341. format_dict["field"] = field
  342. return format_dict
  343. def parse_filter_string(filter_relation_str):
  344. conditions = {}
  345. if not filter_relation_str or filter_relation_str == "[]":
  346. return conditions
  347. raw_conditions = json.loads(filter_relation_str)
  348. for cond_dict in raw_conditions:
  349. fdId = cond_dict.get("fdId")
  350. field = cond_dict.get("name")
  351. fd_type = cond_dict.get("fdType")
  352. op_name = cond_dict.get("filterType")
  353. op_dict = FILTER_OPERATOR_MAP.get(op_name)
  354. values = cond_dict.get("filterValue") # list
  355. is_aggregated = cond_dict.get("isAggregated", False)
  356. # 检查条件合法
  357. if any([fdId is None, field is None, fd_type is None, op_name is None, values is None]):
  358. print(f"fdId: {fdId} field: {field} fd_type: {fd_type} op_name: {op_name} values: {values}")
  359. print(f"警告: 无法解析筛选条件,缺少必须字段,跳过此条件。")
  360. continue
  361. if op_dict is None:
  362. print(f"警告: 无法解析筛选条件,未定义的筛选类型: {op_name},跳过此条件。")
  363. continue
  364. # 特殊操作符
  365. if op_dict == 'CUSTOM':
  366. if "advFilter" not in cond_dict:
  367. print(f"警告: CUSTOM筛选类型不存在advFilter, 跳过此条件。")
  368. continue
  369. if 'formula' in cond_dict:
  370. field = quote_identifier(cond_dict['formula'], formula=True)
  371. else:
  372. field = quote_identifier(cond_dict['name'])
  373. expression = ADV_FILTER_EXP_MAP.get(cond_dict["advFilter"])
  374. if not expression:
  375. print(f"警告: CUSTOM筛选类型出现未定义的advFilter: {cond_dict['advFilter']}, 跳过此条件。")
  376. continue
  377. expression = expression.format(field=field)
  378. conditions[fdId] = {"exp": expression, "agg": is_aggregated}
  379. continue
  380. elif op_dict == 'SPARK_EXPR':
  381. if 'formula' in cond_dict:
  382. formula = quote_identifier(cond_dict['formula'], formula=True)
  383. conditions[fdId] = {"exp": formula, "agg": is_aggregated}
  384. else:
  385. if isinstance(cond_dict['filterValue'], list) and len(cond_dict['filterValue']) == 1:
  386. field = quote_identifier(cond_dict['name'])
  387. value = cond_dict['filterValue'][0]
  388. conditions[fdId] = {"exp": f"{field} = {value}", "agg": is_aggregated}
  389. else:
  390. print(f"警告: 无法解析筛选条件,SPARK_EXPR中未定义。跳过此条件。")
  391. continue
  392. # 处理条件
  393. value_nums = op_dict["val_nums"]
  394. if value_nums != 0 and len(values) != value_nums:
  395. print(f"警告: 无法解析筛选条件,值数量与操作符不匹配。跳过此条件。")
  396. continue
  397. field = quote_identifier(field)
  398. # consolidation 情况,将consolidation公式替换条件左边的field
  399. if "consolidation" in cond_dict:
  400. consolidation = cond_dict["consolidation"]
  401. consolidation_field = get_consolidation_field(consolidation)
  402. if not consolidation_field:
  403. print(f"警告: 无法解析consolidation字段。跳过此条件。")
  404. continue
  405. else:
  406. field = consolidation_field
  407. else:
  408. # 公式,非 consolidation情况
  409. if "formula" in cond_dict:
  410. field = quote_identifier(cond_dict["formula"], formula=True)
  411. if op_name in ("NI", "IN") and len(values) == 0:
  412. print(f"警告: 无法解析筛选条件,IN或NI中参数个数为0。跳过此条件。")
  413. continue
  414. # 特殊情况
  415. if op_name in ('NI', 'IN') and None in values:
  416. conditions[fdId] = {"exp": f"{field} IS NOT NULL", "agg": is_aggregated}
  417. values = [x for x in values if x is not None]
  418. if len(values) == 0:
  419. continue
  420. # 填充模板所需要的参数
  421. format_args = get_format_args(field, fd_type, op_dict, values)
  422. condition_str = op_dict["template"].format(**format_args)
  423. conditions[fdId] = {"exp": condition_str, "agg": is_aggregated}
  424. return conditions
  425. def build_sql_query(card_data, added_fields_info, dataset_fid_name_map):
  426. card_id = card_data["card_id"]
  427. card_name = card_data["card_name"]
  428. dataset_id = card_data.get("ds_id")
  429. if not dataset_id:
  430. print(f"错误: {card_id} {card_name} 数据集ID为空.")
  431. return "", "", "", ""
  432. added_fields_info = get_added_fields_info(added_fields_info)
  433. dataset_fid_name_map = get_fid_name_map(dataset_fid_name_map)
  434. dimension_fids = parse_multi_value_field(card_data.get("field_id", []))
  435. dimension_fields = parse_multi_value_field(card_data.get("field_name", []))
  436. dimension_fid_name_map = dict(zip(dimension_fids, dimension_fields))
  437. dimension_name_fid_map = dict(zip(dimension_fields, dimension_fids))
  438. measure_fids = parse_multi_value_field(card_data.get("num_value_field_id", []))
  439. measure_fields = parse_multi_value_field(card_data.get("num_value_field_name", []))
  440. measure_aggs = parse_multi_value_field(card_data.get("num_value_field_merge_way", []))
  441. filter_relation_str = card_data.get("filters_field_value_name_rela")
  442. sort_fids = parse_multi_value_field(card_data.get("sort_field_id", []))
  443. sort_fields = parse_multi_value_field(card_data.get("sort_field_name", []))
  444. sort_method = parse_multi_value_field(card_data.get("sort_way", []))
  445. all_field_ids = dimension_fids + \
  446. parse_multi_value_field(card_data.get("filters_field_id", [])) + \
  447. sort_fids + \
  448. measure_fids
  449. all_field_names = dimension_fields + \
  450. parse_multi_value_field(card_data.get("filters_field_name", [])) + \
  451. sort_fields + \
  452. measure_fields
  453. all_field_id_name_map = dict(zip(all_field_ids, all_field_names))
  454. # 处理字段重命名关系
  455. fields_rename_map = get_fields_rename_map(card_data.get("field_info", ""))
  456. selected_fid_alias_map = dict(zip(dimension_fids+measure_fids, dimension_fields+measure_fields))
  457. # 构建WITH
  458. with_part = ""
  459. new_date_fields = []
  460. # 日期转换
  461. for fid, name in all_field_id_name_map.items():
  462. fid_splits = fid.split('_')
  463. if len(fid_splits) == 2:
  464. new_date_fields.append((fid, name))
  465. old_fid = fid_splits[0]
  466. selected_fid_alias_map[old_fid] = name
  467. # 新增维度字段
  468. new_dimension_fields = []
  469. for fid, name in dimension_fid_name_map.items():
  470. if fid in added_fields_info:
  471. new_dimension_fields.append((fid, name))
  472. # 如果有新增日期字段、新增维度字段,构建WITH
  473. if new_date_fields or new_dimension_fields:
  474. with_part = build_with_part(new_date_fields, new_dimension_fields, dataset_fid_name_map, added_fields_info, dataset_id)
  475. # 构建SELECT
  476. select_parts = []
  477. has_aggregation = False
  478. # 添加维度字段
  479. for field in dimension_fields:
  480. fid = dimension_name_fid_map[field]
  481. alias = fields_rename_map.get(field)
  482. if alias and alias != "null":
  483. select_parts.append(f"{quote_identifier(field)} AS {quote_identifier(alias)}")
  484. selected_fid_alias_map[fid] = alias
  485. else:
  486. select_parts.append(f"{quote_identifier(field)}")
  487. selected_fid_alias_map[fid] = field
  488. # 加工计算字段
  489. new_measure_fields, measure_aggs, agg_flag = process_calculation_fields(measure_fields, measure_aggs, added_fields_info, card_id, card_name)
  490. if agg_flag:
  491. has_aggregation = True
  492. for i, field in enumerate(new_measure_fields):
  493. fid = measure_fids[i]
  494. alias = fields_rename_map.get(field.strip('`'))
  495. agg_func_template = AGGREGATION_MAP.get(measure_aggs[i])
  496. if not agg_func_template:
  497. if not alias or alias == "null":
  498. alias = measure_fields[i]
  499. select_parts.append(f"{field} AS {quote_identifier(alias)}")
  500. selected_fid_alias_map[fid] = alias
  501. else:
  502. has_aggregation = True
  503. # 特殊处理 count distinct
  504. if '{}' in agg_func_template:
  505. agg_expression = agg_func_template.format(field)
  506. else:
  507. agg_expression = f"{agg_func_template}({field})"
  508. # 添加别名
  509. if not alias or alias == "null":
  510. suffix = AGGREGATION_SUFFIX_MAP.get(measure_aggs[i])
  511. alias = f"{measure_fields[i]}_{suffix}"
  512. select_parts.append(f"{agg_expression} AS {quote_identifier(alias)}")
  513. selected_fid_alias_map[fid] = alias
  514. if not select_parts:
  515. print(f"错误: {card_id} {card_name} 没有select字段。")
  516. return '', '', '', ''
  517. else:
  518. select_clause = "SELECT " + ",\n ".join(select_parts)
  519. # 构建FROM
  520. if with_part:
  521. from_clause = "FROM tmp"
  522. else:
  523. from_clause = f"FROM {quote_identifier(str(dataset_id))}"
  524. # 构建WHERE
  525. filter_conditions = {}
  526. try:
  527. filter_conditions = parse_filter_string(filter_relation_str)
  528. except Exception as e:
  529. print(f"错误: 卡片 {card_id} {card_name} 解析筛选条件出错:{e}。WHERE字句缺失。")
  530. print("详细错误信息:")
  531. print(traceback.format_exc())
  532. # 构建GROUPBY
  533. group_by_clause = ""
  534. if has_aggregation and dimension_fields:
  535. group_by_parts = [quote_identifier(field) for field in dimension_fields]
  536. group_by_clause = "GROUP BY " + ", ".join(group_by_parts)
  537. # 构建ORDERBY
  538. order_by_clause = ""
  539. if sort_fields and sort_method and len(sort_fields) == len(sort_method):
  540. order_by_parts = []
  541. for i, field in enumerate(sort_fields):
  542. fid = sort_fids[i]
  543. if fid not in selected_fid_alias_map:
  544. continue
  545. alias = selected_fid_alias_map[fid]
  546. order_by_parts.append(f"{quote_identifier(alias)} {sort_method[i]}")
  547. if order_by_parts:
  548. order_by_clause = "ORDER BY " + ", ".join(order_by_parts)
  549. # 组装SQL
  550. sql_parts = [with_part, select_clause, from_clause]
  551. return ("\n".join(sql_parts)).strip(), json.dumps(filter_conditions, ensure_ascii=False), group_by_clause, order_by_clause
  552. def generate():
  553. res_list = []
  554. df = pd.read_parquet("data/dev_card.parquet").reset_index()
  555. add_field_info = pd.read_parquet("data/dev_calc.parquet").set_index("card_id")
  556. all_field_info = pd.read_parquet("data/dev_field.parquet").set_index("ds_id")
  557. for i, row in df.iterrows():
  558. if i > 100:
  559. break
  560. row = row.to_dict()
  561. if row["card_type_cd"] != '图表' or row["ds_id"] == "":
  562. continue
  563. card_id = row["card_id"]
  564. try:
  565. added_fields_info = add_field_info.loc[[card_id]]
  566. except KeyError:
  567. added_fields_info = pd.DataFrame()
  568. try:
  569. dataset_fid_name_map = all_field_info.loc[[row["ds_id"]]]
  570. except KeyError:
  571. print(f"错误: 没有数据及字段信息: {card_id}")
  572. continue
  573. select, where, groupby, orderby = '', '', '', ''
  574. try:
  575. select, where, groupby, orderby = build_sql_query(row, added_fields_info, dataset_fid_name_map)
  576. except Exception as e:
  577. print(f"错误: 卡片 {card_id} 发生未知错误: {e}")
  578. print(i, traceback.format_exc())
  579. if not select:
  580. print(f"{card_id} 生成失败")
  581. continue
  582. res_list.append([str(card_id), str(row["card_name"]), select, where, groupby, orderby])
  583. res_df = pd.DataFrame(res_list, columns=["card_id", "card_name", "select", 'where', 'groupby', 'orderby'])
  584. return res_df
  585. if __name__ == "__main__":
  586. df = generate()
  587. df.to_parquet("output/sql.parquet")