ysl2007 vor 2 Monaten
Ursprung
Commit
de26ee1f1c
1 geänderte Dateien mit 402 neuen und 0 gelöschten Zeilen
  1. 402 0
      carddef2sql.py

+ 402 - 0
carddef2sql.py

@@ -0,0 +1,402 @@
+import ast
+import json
+import re
+import sys
+import traceback
+
+import pandas as pd
+
+# 聚合函数
+AGGREGATION_MAP = {
+    'SUM': 'SUM',
+    'AVG': 'AVG',
+    'CNT': 'COUNT',
+    'MAX': 'MAX',
+    'MIN': 'MIN',
+    'CNT_DISTINCT': 'COUNT(DISTINCT {})'
+}
+
+AGGREGATION_SUFFIX_MAP = {
+    'SUM': '求和',
+    'AVG': '均值',
+    'CNT': '计数',
+    'MAX': '最大值',
+    'MIN': '最小值',
+    'CNT_DISTINCT': '去重计数'
+}
+
+# 筛选操作符
+FILTER_OPERATOR_MAP = {
+    'BT': {"val_nums": 2, 'template': "{field} BETWEEN {value_1} AND {value_2}"},
+    'CLOSE_BT_OPEN': {"val_nums": 2, 'template': "{field} >= {value_1} AND {field} < {value_2}"},
+    'EQ': {"val_nums": 1, 'template': "{field} = {value}"},
+    'GE': {"val_nums": 1, 'template': "{field} >= {value}"},
+    'GT': {"val_nums": 1, 'template': "{field} > {value}"},
+    'IN': {"val_nums": 9, 'template': "{field} IN ({values})"},
+    'IS_NULL': {"val_nums": 0, 'template': "{field} IS NULL"},
+    'LE': {"val_nums": 1, 'template': "{field} <= {value}"},
+    'LT': {"val_nums": 1, 'template': "{field} < {value}"},
+    'NE': {"val_nums": 1, 'template': "{field} != {value}"},
+    'NI': {"val_nums": 9, 'template': "{field} NOT IN ({values})"},
+    'NOT_NULL': {"val_nums": 0, 'template': "{field} IS NOT NULL"},
+    'OPEN_BT_CLOSE': {"val_nums": 2, 'template': "{field} > {value_1} AND {field} <= {value_2}"},
+    'OPEN_BT_OPEN': {"val_nums": 2, 'template': "{field} > {value_1} AND {field} < {value_2}"},
+    'CONTAINS': {"val_nums": 1, 'template': "{field} LIKE '%{value}%'"},
+    'NOT_CONTAINS': {"val_nums": 1, 'template': "{field} NOT LIKE '%{value}%'"},
+    'STARTS_WITH': {"val_nums": 1, 'template': "{field} LIKE '{value}%'"},
+    'NOT_STARTS_WITH': {"val_nums": 1, 'template': "{field} NOT LIKE '{value}%'"},
+    'ENDS_WITH': {"val_nums": 1, 'template': "{field} LIKE '%{value}'"},
+    'NOT_ENDS_WITH': {"val_nums": 1, 'template': "{field} NOT LIKE '%{value}'"},
+    'CUSTOM': 'CUSTOM',
+    'SPARK_EXPR': 'SPARK_EXPR'
+}
+
+# 引用
+IDENTIFIER_QUOTE = '`'
+QUOTE_FLAG = True
+
+# 副词
+ADV_FILTER_EXP_MAP = {
+    'TODAY': "{field} = '{{today}}'",
+    'YESTERDAY': "{field} = date_sub('{{today}}', 1)",
+    'DAY_BEFORE_YESTERDAY': "{field} = date_sub('{{today}}', 2)",
+    'LAST_14_DAY': "{field} between date_sub('{{today}}', 13) and '{{today}}'",
+    'LAST_1_YEAR': "{field} between date_sub('{{today}}', 364) and '{{today}}'",
+    'LAST_30_DAY': "{field} between date_sub('{{today}}', 29) and '{{today}}'",
+    'LAST_7_DAY': "{field} between date_sub('{{today}}', 6) and '{{today}}'",
+    'LAST_90_DAY': "{field} between date_sub('{{today}}', 89) and '{{today}}'",
+    'LAST_MONTH': "{field} between datesub(add_months('{{today}}', -1), day('{{today}}') - 1) and date_sub('{{today}}', day('{{today}}'))",
+    'LAST_WEEK': "{field} between date_sub('{{today}}', case when dayofweek('{{today}}') = 1 then 13 else dayofweek('{{today}}')+5 end) and date_sub('{{today}}', case when dayofweek('{{today}}') = 1 then 7 else dayofweek('{{today}}')-1 end)",
+    'MONTH_BEFORE_LAST_MONTH': "{field} between date_sub(add_months('{{today}}', -2), day(add_months('{{today}}', -2))-1) and date_sub(add_months('{{today}}', -1), day(add_months('{{today}}', -1)))",
+    'MONTH_TO_DAY': "{field} between date_sub('{{today}}', day('{{today}}')-1) and '{{today}}'",
+    'MONTH_TO_YESTERDAY': "{field} between date_sub('{{today}}', day('{{today}}')-1) and date_sub('{{today}}', 1)",
+    'QUARTER_TO_DAY': """{field} between concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
+                                                                     when quarter('{{today}}') = 2 then '-04-01'
+                                                                     when quarter('{{today}}') = 3 then '-07-01'
+                                                                     when quarter('{{today}}') = 4 then '-10-01' end) and '{{today}}'""",
+    'QUARTER_TO_YESTERDAY': """{field} between concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
+                                                                                when quarter('{{today}}') = 2 then '-04-01'
+                                                                                when quarter('{{today}}') = 3 then '-07-01'
+                                                                                when quarter('{{today}}') = 4 then '-10-01' end) and date_sub('{{today}}', 1)""",
+    'WEEK_TO_DAY': "{field} between date_sub('{{today}}', (dayofweek('{{today}}')+5)%7) and '{{today}}'",
+    'WEEK_TO_YESTERDAY': "{field} between date_sub('{{today}}', (dayofweek('{{today}}')+5)%7) and date_sub('{{today}}', 1)",
+    'YEAR_TO_DAY': "{field} between concat(year('{{today}}'), '-01-01') and '{{today}}'",
+    'YEAR_TO_LAST_MONTH': "{field} between concat(year('{{today}}'), '-01-01') and date_sub('{{today}}', day('{{today}}'))",
+    'YEAR_TO_LAST_QUARTER': """{field} between concat(year('{{today}}'), '-01-01') and date_sub(concat(year('{{today}}'), case when quarter('{{today}}') = 1 then '-01-01'
+                                                                                when quarter('{{today}}') = 2 then '-04-01'
+                                                                                when quarter('{{today}}') = 3 then '-07-01'
+                                                                                when quarter('{{today}}') = 4 then '-10-01' end), 1)""",
+    'YEAR_TO_YESTERDAY': "{field} between concat(year('{{today}}'), '-01-01') and date_sub('{{today}}', 1)",
+}
+
+# 自带日期转换
+PARTIAL_DATE_EXPRESSION = {
+    'day': "`{old}` as `{new}`",
+    'month': "concat(year(`{old}`), '-', lpad(month(`{old}`), 2, '0') as `{new}`",
+    'year': "year(`{old}`) as `{new}`",
+    'quarter': "concat(year(`{old}`), '-S', quarter(`{old}`)) as `{new}`",
+    'week': "weekofyear(`{old}`) as `{new}`",
+    'dayofweek': "dayofweek(`{old}`)-1 as `{new}`",
+    'hour': "hour(`{old}`)-8 as `{new}`",
+    'minute': "minute(`{old}`)-8 as `{new}`",
+}
+
+# 获取新增字段
+def get_added_fields_info(added_fields_df):
+    if added_fields_df.empty:
+        return {}
+    added_fields_info = {}
+    for _, row in added_fields_df.iterrows():
+        try:
+            content = json.loads(row['calc_field_logic'])
+        except:
+            print(f'ERROR: 新增字段解析错误: {row["calc_field_logic"]}')
+            continue
+        field_name = content['name']
+        field_id = content['fdId']
+        added_fields_info[field_id] = {"field_name": field_name, 'calculation': content}
+        added_fields_info[field_name] = {"field_id": field_id, 'calculation': content}
+    return added_fields_info
+
+def get_fid_name_map(field_def_df):
+    field_id_name = {}
+    for i, row in field_def_df.iterrows():
+        field_id_name[row['field_id']] = row['field_name']
+    return field_id_name
+
+# 字段映射关系
+def get_fields_rename_map(field_info):
+    ret = {}
+    try:
+        tmp_map = json.loads(field_info)
+    except:
+        return ret
+    dimensions, metrics = tmp_map.get("dimensions"), tmp_map.get("metrics")
+    if dimensions and dimensions != 'null':
+        for one_map in dimensions:
+            ret[one_map["name"]] = one_map["alias"]
+    if metrics and metrics != 'null':
+        for one_map in metrics:
+            ret[one_map["name"]] = one_map["alias"]
+    return ret
+
+# 处理过滤条件中的consolidation
+def get_consolidation_field(consolidation_dict):
+    field_name = quote_identifier(consolidation_dict["sourceName"])
+    group_type = consolidation_dict["groupType"]
+    fd_type = consolidation_dict["sourceFdType"]
+    group_rules = consolidation_dict.get('groups')
+    fixed_step = consolidation_dict.get('fixedStepSetting')
+    when_part = []
+    else_part = None
+    if group_type == 'ITEM':
+        for group in group_rules:
+            group_name = group["groupName"]
+            if group.get('isOtherGroup', False):
+                else_part = f"ELSE '{group_name}'"
+            else:
+                selected_values = group.get('selectedValues', [])
+                when_value = str(selected_values)
+                when_value = when_value[1:-1] # 去除中括号
+                if when_value is None:
+                    when_part.append(f"WHEN {field_name} IS NULL THEN '{group_name}'")
+                else:
+                    when_part.append(f"WHEN {field_name} IN ({when_value}) THEN '{group_name}'")
+    elif group_type == "CONDITION":
+        for group in group_rules:
+            group_name = group["groupName"]
+            if group.get('isOtherGroup', False):
+                else_part = f"ELSE '{group_name}'"
+            else:
+                rules = group["rules"]
+                cond_list = []
+                combine_type = " " + rules["combineType"] + " " # 加空格以便join
+                for cond in rules["conditions"]:
+                    filter_type = cond["filterType"]
+                    filter_value = cond["filterValue"]
+                    op_dict = FILTER_OPERATOR_MAP[filter_type]
+                    format_args = get_format_args(field_name, fd_type, op_dict, [filter_value])
+                    cond_str = op_dict["template"].format(**format_args)
+                    cond_list.append(cond_str)
+                when_str = "WHEN " + combine_type.join(cond_list) + f" THEN '{group_name}'"
+                when_part.append(when_str)
+    elif group_type == "CUSTOM_STEP":
+        for group in group_rules:
+            group_name = group["groupName"]
+            if group.get('isOtherGroup', False):
+                else_part = f"ELSE '{group_name}'"
+            else:
+                setting = group['customStepSetting']
+                operator = setting['operator']
+                start = setting['startValue']
+                end = setting['endValue']
+                condition = ''
+                if operator == 'BT':
+                    condition = f"{field_name} BETWEEN {start} AND {end}"
+                elif operator == 'OPEN_BT_CLOSE':
+                    condition = f"{field_name} > {start} AND {field_name} <= {end}"
+                elif operator == 'OPEN_BT_OPEN':
+                    condition = f"{field_name} > {start} AND {field_name} < {end}"
+                elif operator == 'CLOSE_BT_OPEN':
+                    condition = f"{field_name} >= {start} AND {field_name} < {end}"
+                else:
+                    raise ValueError(f"未知的操作符: {operator}")
+                when_part.append(f"WHEN {condition} THEN '{group_name}'")
+    elif group_type == "FIXED_STEP":
+        start = fixed_step['startValue']
+        end = fixed_step['endValue']
+        step = fixed_step['stepSize']
+        # 生成每个区间的case when部分
+        lower = start
+        while lower < end:
+            upper = lower + step
+            case_part = f"WHEN {field_name} >= {lower} AND {field_name} < {upper} THEN '{lower}-{upper}'"
+            when_part.append(case_part)
+            lower = upper
+        # 处理最后一个区间
+        case_part = f"WHEN {field_name} >= {lower} AND {field_name} <= {end} THEN '{lower}-{end}'"
+        when_part.append(case_part)
+        else_part = "ELSE NULL"
+    else:
+        raise ValueError(f"未知的groupType: {group_type}")
+    field = "CASE "
+    field += "\n".join(when_part)
+    if else_part:
+        field += f"\n{else_part}"
+    field += "\nEND"
+    return field
+
+def build_with_part(new_date_fields, new_dimension_fields, dataset_fid_name_map, added_fields_info, dataset_id):
+    sql_part = 'WITH tmp as (\nSELECT *,\n'
+    with_expressions = []
+    for fid, new_name in new_date_fields:
+        old_fid, partial_date = fid.split('_')
+        if old_fid in dataset_fid_name_map:
+            old_name = dataset_fid_name_map[old_fid]
+        elif old_fid in added_fields_info:
+            old_name = added_fields_info[old_fid]["calculation"]["formula"].replace("[]", "").replace("]", '')
+        else:
+            raise ValueError(f"字段 {fid} {new_name} 不存在")
+        tmp_part = PARTIAL_DATE_EXPRESSION.get(partial_date, None)
+        if tmp_part:
+            tmp_part = tmp_part.format(old=old_name, new=new_name)
+            with_expressions.append(tmp_part)
+        else:
+            raise ValueError(f"日期转换方式 {partial_date} 不存在")
+    for fid, new_name in new_dimension_fields:
+        field_def = added_fields_info[fid]
+        new_name = field_def["field_name"]
+        formula = field_def["calculation"]["formula"]
+        if "consolidation" in formula:
+            consolidation_dict = json.loads(formula)["consolidation"]
+            tmp_part = get_consolidation_field(consolidation_dict)
+            tmp_part += f" AS `{new_name}`"
+        else:
+            tmp_part = quote_identifier(formula, formula=True) + f" AS `{new_name}`"
+        with_expressions.append(tmp_part)
+    sql_part += ',\n'.join(with_expressions)
+    sql_part += f"\nFROM `{dataset_id}\n`"
+    return sql_part
+
+# 处理计算字段
+def process_calculation_fields(measure_fields, measure_aggs, calculation_fields, card_id, card_name):
+    ## 数值字段数量 小于 聚合函数数量,不合法
+    if len(measure_fields) < len(measure_aggs):
+        print(f"警告: 卡片 {card_id} {card_name}: 数值字段数量小于聚合函数数量,不合法")
+        print(f"警告: 卡片 {card_id} {card_name}: 不添加任何数值字段.")
+        return [], [], False
+    ## 数值字段 大于 聚合函数数量,存在聚合类型的计算字段,尝试填充
+    elif len(measure_fields) > len(measure_aggs):
+        ## 计算数值字段数量
+        num_cals = 0
+        for field in measure_fields:
+            if field in calculation_fields and calculation_fields[field]["calculation"]["isAggregated"] is True:
+                num_cals += 1
+        ## 如果不存在任何计算字段,补全剩余的NUL聚合函数
+        if num_cals == 0:
+            measure_aggs.extend(['NULL'] * (len(measure_fields) - len(measure_aggs)))
+            return measure_fields, measure_aggs, True
+        ## 如果存在计算字段,且相加后的 聚合函数数量 仍小于 数值字段数量,不合法
+        if num_cals + len(measure_aggs) != len(measure_fields):
+            print(f"警告: 卡片 {card_id} {card_name}: 数值字段数量大于聚合函数数量,不合法")
+            print(f"警告: 卡片 {card_id} {card_name}: 不添加任何数值字段.")
+            return [], [], False
+    ## 通过验证,填充聚合函数
+    new_measure_fields, new_measure_aggs, agg_flag = [], [], False
+    for i, field in enumerate(measure_fields):
+        ## 非计算字段
+        if field not in calculation_fields:
+            new_measure_fields.append(quote_identifier(field))
+            new_measure_aggs.append(measure_aggs.pop(0))
+        ## 计算字段
+        else:
+            formula = calculation_fields[field]["calculation"]["formula"]
+            formula = formula.replace('\n', '')
+            new_measure_fields.append(quote_identifier(formula, formula=True))
+            if calculation_fields[field]["calculation"]["isAggregated"] is True:
+                new_measure_aggs.append("NUL")
+                agg_flag = True
+            else:
+                new_measure_aggs.append(measure_aggs.pop(0))
+    return new_measure_fields, new_measure_aggs, agg_flag
+
+def quote_identifier(identifier, formula=False):
+    if not QUOTE_FLAG:
+        return identifier
+    if not identifier:
+        return ''
+    # 简单处理,如果包含非字母数字下划线或可能是关键字,则加反引号
+    # 更复杂的关键字检查可以添加
+    if formula:
+        params = re.findall(r"\[DYNAMIC_PARAMS\.\w+\]", identifier)
+        for p in params:
+            subs = p[1:-1]
+            subs = "{{{"+subs+"}}}"
+            identifier = identifier.replace(p, subs, 1)
+        identifier = identifier.replace('[', IDENTIFIER_QUOTE).replace(']', IDENTIFIER_QUOTE)
+    else:
+        identifier = identifier.replace('\n', ' ')
+        if not re.match(r'[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
+            return f'{IDENTIFIER_QUOTE}{identifier}{IDENTIFIER_QUOTE}'
+    return identifier
+
+def parse_multi_value_field(field_value):
+    # 解析包含多个值的字段
+    if not field_value or field_value == "":
+        return []
+    try:
+        res = ast.literal_eval(field_value)
+    except Exception:
+        print(field_value)
+        print(traceback.format_exc())
+    return ast.literal_eval(field_value)
+
+# 处理过滤条件的操作符
+def get_format_args(field, fd_type, op_dict, values):
+    # 按照数据类型及操作符,判断是否需要加引号
+    if fd_type in ('DECIMAL', 'DOUBLE', 'INT', 'FLOAT', 'LONG', 'SHORT'):
+        values = [x for x in values if x]
+    elif fd_type in ('DATE', 'STRING', 'SUB_DATE', 'TIMESTAMP'):
+        if op_dict.get('quote', True):
+            values = [f"'{x}'" for x in values]
+    elif fd_type == 'BOOL':
+        values = [value.upper() for value in values]
+    else:
+        pass
+    # 按照操作符所需参数个数构造format参数
+    format_dict = {}
+    value_nums = op_dict['val_nums']
+    if value_nums == 9:
+        format_dict.update(**{"values": ", ".join(values)})
+    elif value_nums == 2:
+        format_dict.update(**{"value_1": values[0], "value_2": values[1]})
+    elif value_nums == 1:
+        format_dict.update(**{"value": values[0]})
+    else:
+        pass
+    format_dict["field"] = field
+    return format_dict
+
+def parse_filter_string(filter_relation_str):
+    conditions = {}
+    if not filter_relation_str or filter_relation_str == "[]":
+        return conditions
+
+    raw_conditions = json.loads(filter_relation_str)
+    for cond_dict in raw_conditions:
+        fdId = cond_dict.get("fdId")
+        field = cond_dict.get("name")
+        fd_type = cond_dict.get("fdType")
+        op_name = cond_dict.get("filterType")
+        op_dict = FILTER_OPERATOR_MAP.get(op_name)
+        values = cond_dict.get("filterValue") # list
+        is_aggregated = cond_dict.get("isAggregated", False)
+
+        # 检查条件合法
+        if any([fdId is None, field is None, fd_type is None, op_name is None, values is None]):
+            print(f"fdId: {fdId} field: {field} fd_type: {fd_type} op_name: {op_name} values: {values}")
+            print(f"警告: 无法解析筛选条件,缺少必须字段,跳过此条件。")
+            continue
+        if op_dict is None:
+            print(f"警告: 无法解析筛选条件,未定义的筛选类型: {op_name},跳过此条件。")
+            continue
+
+        # 特殊操作符
+        if op_dict == 'CUSTOM':
+            if "advFilter" not in cond_dict:
+                print(f"警告: CUSTOM筛选类型不存在advFilter, 跳过此条件。")
+                continue
+            if 'formula' in cond_dict:
+                field = quote_identifier(cond_dict['formula'], formula=True)
+            else:
+                field = quote_identifier(cond_dict['name'])
+            expression = ADV_FILTER_EXP_MAP.get(cond_dict["advFilter"])
+            if not expression:
+                print(f"警告: CUSTOM筛选类型出现未定义的advFilter: {cond_dict['advFilter']}, 跳过此条件。")
+                continue
+            expression = expression.format(field=field)
+            conditions[fdId] = {"exp": expression, "agg": is_aggregated}
+            continue
+        elif op_dict == 'SPARK_EXPR':
+            if 'formula' in cond_dict:
+