|
|
@@ -140,92 +140,6 @@ def get_fields_rename_map(field_info):
|
|
|
ret[one_map["name"]] = one_map["alias"]
|
|
|
return ret
|
|
|
|
|
|
-# 处理过滤条件中的consolidation
|
|
|
-def get_consolidation_field(consolidation_dict):
|
|
|
- field_name = quote_identifier(consolidation_dict["sourceName"])
|
|
|
- group_type = consolidation_dict["groupType"]
|
|
|
- fd_type = consolidation_dict["sourceFdType"]
|
|
|
- group_rules = consolidation_dict.get('groups')
|
|
|
- fixed_step = consolidation_dict.get('fixedStepSetting')
|
|
|
- when_part = []
|
|
|
- else_part = None
|
|
|
- if group_type == 'ITEM':
|
|
|
- for group in group_rules:
|
|
|
- group_name = group["groupName"]
|
|
|
- if group.get('isOtherGroup', False):
|
|
|
- else_part = f"ELSE '{group_name}'"
|
|
|
- else:
|
|
|
- selected_values = group.get('selectedValues', [])
|
|
|
- when_value = str(selected_values)
|
|
|
- when_value = when_value[1:-1] # 去除中括号
|
|
|
- if when_value is None:
|
|
|
- when_part.append(f"WHEN {field_name} IS NULL THEN '{group_name}'")
|
|
|
- else:
|
|
|
- when_part.append(f"WHEN {field_name} IN ({when_value}) THEN '{group_name}'")
|
|
|
- elif group_type == "CONDITION":
|
|
|
- for group in group_rules:
|
|
|
- group_name = group["groupName"]
|
|
|
- if group.get('isOtherGroup', False):
|
|
|
- else_part = f"ELSE '{group_name}'"
|
|
|
- else:
|
|
|
- rules = group["rules"]
|
|
|
- cond_list = []
|
|
|
- combine_type = " " + rules["combineType"] + " " # 加空格以便join
|
|
|
- for cond in rules["conditions"]:
|
|
|
- filter_type = cond["filterType"]
|
|
|
- filter_value = cond["filterValue"]
|
|
|
- op_dict = FILTER_OPERATOR_MAP[filter_type]
|
|
|
- format_args = get_format_args(field_name, fd_type, op_dict, [filter_value])
|
|
|
- cond_str = op_dict["template"].format(**format_args)
|
|
|
- cond_list.append(cond_str)
|
|
|
- when_str = "WHEN " + combine_type.join(cond_list) + f" THEN '{group_name}'"
|
|
|
- when_part.append(when_str)
|
|
|
- elif group_type == "CUSTOM_STEP":
|
|
|
- for group in group_rules:
|
|
|
- group_name = group["groupName"]
|
|
|
- if group.get('isOtherGroup', False):
|
|
|
- else_part = f"ELSE '{group_name}'"
|
|
|
- else:
|
|
|
- setting = group['customStepSetting']
|
|
|
- operator = setting['operator']
|
|
|
- start = setting['startValue']
|
|
|
- end = setting['endValue']
|
|
|
- condition = ''
|
|
|
- if operator == 'BT':
|
|
|
- condition = f"{field_name} BETWEEN {start} AND {end}"
|
|
|
- elif operator == 'OPEN_BT_CLOSE':
|
|
|
- condition = f"{field_name} > {start} AND {field_name} <= {end}"
|
|
|
- elif operator == 'OPEN_BT_OPEN':
|
|
|
- condition = f"{field_name} > {start} AND {field_name} < {end}"
|
|
|
- elif operator == 'CLOSE_BT_OPEN':
|
|
|
- condition = f"{field_name} >= {start} AND {field_name} < {end}"
|
|
|
- else:
|
|
|
- raise ValueError(f"未知的操作符: {operator}")
|
|
|
- when_part.append(f"WHEN {condition} THEN '{group_name}'")
|
|
|
- elif group_type == "FIXED_STEP":
|
|
|
- start = fixed_step['startValue']
|
|
|
- end = fixed_step['endValue']
|
|
|
- step = fixed_step['stepSize']
|
|
|
- # 生成每个区间的case when部分
|
|
|
- lower = start
|
|
|
- while lower < end:
|
|
|
- upper = lower + step
|
|
|
- case_part = f"WHEN {field_name} >= {lower} AND {field_name} < {upper} THEN '{lower}-{upper}'"
|
|
|
- when_part.append(case_part)
|
|
|
- lower = upper
|
|
|
- # 处理最后一个区间
|
|
|
- case_part = f"WHEN {field_name} >= {lower} AND {field_name} <= {end} THEN '{lower}-{end}'"
|
|
|
- when_part.append(case_part)
|
|
|
- else_part = "ELSE NULL"
|
|
|
- else:
|
|
|
- raise ValueError(f"未知的groupType: {group_type}")
|
|
|
- field = "CASE "
|
|
|
- field += "\n".join(when_part)
|
|
|
- if else_part:
|
|
|
- field += f"\n{else_part}"
|
|
|
- field += "\nEND"
|
|
|
- return field
|
|
|
-
|
|
|
def build_with_part(new_date_fields, new_dimension_fields, dataset_fid_name_map, added_fields_info, dataset_id):
|
|
|
sql_part = 'WITH tmp as (\nSELECT *,\n'
|
|
|
with_expressions = []
|
|
|
@@ -234,7 +148,7 @@ def build_with_part(new_date_fields, new_dimension_fields, dataset_fid_name_map,
|
|
|
if old_fid in dataset_fid_name_map:
|
|
|
old_name = dataset_fid_name_map[old_fid]
|
|
|
elif old_fid in added_fields_info:
|
|
|
- old_name = added_fields_info[old_fid]["calculation"]["formula"].replace("[]", "").replace("]", '')
|
|
|
+ old_name = added_fields_info[old_fid]["calculation"]["formula"].replace("[", "").replace("]", '')
|
|
|
else:
|
|
|
raise ValueError(f"字段 {fid} {new_name} 不存在")
|
|
|
tmp_part = PARTIAL_DATE_EXPRESSION.get(partial_date, None)
|
|
|
@@ -313,7 +227,14 @@ def quote_identifier(identifier, formula=False):
|
|
|
subs = p[1:-1]
|
|
|
subs = "{{{"+subs+"}}}"
|
|
|
identifier = identifier.replace(p, subs, 1)
|
|
|
- identifier = identifier.replace('[', IDENTIFIER_QUOTE).replace(']', IDENTIFIER_QUOTE)
|
|
|
+ # 仅替换配置里用于包裹字段名的 [字段],保留 Hive 下标访问里的 [2] 等表达式
|
|
|
+ def replace_bracket_identifier(match):
|
|
|
+ content = match.group(1)
|
|
|
+ if re.fullmatch(r"\d+", content.strip()):
|
|
|
+ return match.group(0)
|
|
|
+ return f"{IDENTIFIER_QUOTE}{content}{IDENTIFIER_QUOTE}"
|
|
|
+
|
|
|
+ identifier = re.sub(r"\[([^\[\]]+)\]", replace_bracket_identifier, identifier)
|
|
|
else:
|
|
|
identifier = identifier.replace('\n', ' ')
|
|
|
if not re.match(r'[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
|
|
|
@@ -357,6 +278,92 @@ def get_format_args(field, fd_type, op_dict, values):
|
|
|
format_dict["field"] = field
|
|
|
return format_dict
|
|
|
|
|
|
+# 处理过滤条件中的consolidation
|
|
|
+def get_consolidation_field(consolidation_dict):
|
|
|
+ field_name = quote_identifier(consolidation_dict["sourceName"])
|
|
|
+ group_type = consolidation_dict["groupType"]
|
|
|
+ fd_type = consolidation_dict["sourceFdType"]
|
|
|
+ group_rules = consolidation_dict.get('groups')
|
|
|
+ fixed_step = consolidation_dict.get('fixedStepSetting')
|
|
|
+ when_part = []
|
|
|
+ else_part = None
|
|
|
+ if group_type == 'ITEM':
|
|
|
+ for group in group_rules:
|
|
|
+ group_name = group["groupName"]
|
|
|
+ if group.get('isOtherGroup', False):
|
|
|
+ else_part = f"ELSE '{group_name}'"
|
|
|
+ else:
|
|
|
+ selected_values = group.get('selectedValues', [])
|
|
|
+ when_value = str(selected_values)
|
|
|
+ when_value = when_value[1:-1] # 去除中括号
|
|
|
+ if when_value is None:
|
|
|
+ when_part.append(f"WHEN {field_name} IS NULL THEN '{group_name}'")
|
|
|
+ else:
|
|
|
+ when_part.append(f"WHEN {field_name} IN ({when_value}) THEN '{group_name}'")
|
|
|
+ elif group_type == "CONDITION":
|
|
|
+ for group in group_rules:
|
|
|
+ group_name = group["groupName"]
|
|
|
+ if group.get('isOtherGroup', False):
|
|
|
+ else_part = f"ELSE '{group_name}'"
|
|
|
+ else:
|
|
|
+ rules = group["rules"]
|
|
|
+ cond_list = []
|
|
|
+ combine_type = " " + rules["combineType"] + " " # 加空格以便join
|
|
|
+ for cond in rules["conditions"]:
|
|
|
+ filter_type = cond["filterType"]
|
|
|
+ filter_value = cond["filterValue"]
|
|
|
+ op_dict = FILTER_OPERATOR_MAP[filter_type]
|
|
|
+ format_args = get_format_args(field_name, fd_type, op_dict, [filter_value])
|
|
|
+ cond_str = op_dict["template"].format(**format_args)
|
|
|
+ cond_list.append(cond_str)
|
|
|
+ when_str = "WHEN " + combine_type.join(cond_list) + f" THEN '{group_name}'"
|
|
|
+ when_part.append(when_str)
|
|
|
+ elif group_type == "CUSTOM_STEP":
|
|
|
+ for group in group_rules:
|
|
|
+ group_name = group["groupName"]
|
|
|
+ if group.get('isOtherGroup', False):
|
|
|
+ else_part = f"ELSE '{group_name}'"
|
|
|
+ else:
|
|
|
+ setting = group['customStepSetting']
|
|
|
+ operator = setting['operator']
|
|
|
+ start = setting['startValue']
|
|
|
+ end = setting['endValue']
|
|
|
+ condition = ''
|
|
|
+ if operator == 'BT':
|
|
|
+ condition = f"{field_name} BETWEEN {start} AND {end}"
|
|
|
+ elif operator == 'OPEN_BT_CLOSE':
|
|
|
+ condition = f"{field_name} > {start} AND {field_name} <= {end}"
|
|
|
+ elif operator == 'OPEN_BT_OPEN':
|
|
|
+ condition = f"{field_name} > {start} AND {field_name} < {end}"
|
|
|
+ elif operator == 'CLOSE_BT_OPEN':
|
|
|
+ condition = f"{field_name} >= {start} AND {field_name} < {end}"
|
|
|
+ else:
|
|
|
+ raise ValueError(f"未知的操作符: {operator}")
|
|
|
+ when_part.append(f"WHEN {condition} THEN '{group_name}'")
|
|
|
+ elif group_type == "FIXED_STEP":
|
|
|
+ start = fixed_step['startValue']
|
|
|
+ end = fixed_step['endValue']
|
|
|
+ step = fixed_step['stepSize']
|
|
|
+ # 生成每个区间的case when部分
|
|
|
+ lower = start
|
|
|
+ while lower < end:
|
|
|
+ upper = lower + step
|
|
|
+ case_part = f"WHEN {field_name} >= {lower} AND {field_name} < {upper} THEN '{lower}-{upper}'"
|
|
|
+ when_part.append(case_part)
|
|
|
+ lower = upper
|
|
|
+ # 处理最后一个区间
|
|
|
+ case_part = f"WHEN {field_name} >= {lower} AND {field_name} <= {end} THEN '{lower}-{end}'"
|
|
|
+ when_part.append(case_part)
|
|
|
+ else_part = "ELSE NULL"
|
|
|
+ else:
|
|
|
+ raise ValueError(f"未知的groupType: {group_type}")
|
|
|
+ field = "CASE "
|
|
|
+ field += "\n".join(when_part)
|
|
|
+ if else_part:
|
|
|
+ field += f"\n{else_part}"
|
|
|
+ field += "\nEND"
|
|
|
+ return field
|
|
|
+
|
|
|
def parse_filter_string(filter_relation_str):
|
|
|
conditions = {}
|
|
|
if not filter_relation_str or filter_relation_str == "[]":
|
|
|
@@ -595,9 +602,9 @@ def build_sql_query(card_data, added_fields_info, dataset_fid_name_map):
|
|
|
|
|
|
def generate():
|
|
|
res_list = []
|
|
|
- df = pd.read_parquet("data/dev_card.parquet").reset_index()
|
|
|
- add_field_info = pd.read_parquet("data/dev_calc.parquet").set_index("card_id")
|
|
|
- all_field_info = pd.read_parquet("data/dev_field.parquet").set_index("ds_id")
|
|
|
+ df = pd.read_csv("data/card.csv").fillna("").reset_index()
|
|
|
+ add_field_info = pd.read_csv("data/calc.csv").fillna('').set_index("card_id")
|
|
|
+ all_field_info = pd.read_csv("data/field.csv").fillna('').set_index("ds_id")
|
|
|
for i, row in df.iterrows():
|
|
|
if i > 100:
|
|
|
break
|