aios/sql/python/sql_utils.py (369 lines of code) (raw):

import os import json import logging import sql_envs def get_file_as_json(file_path): if not os.path.exists(file_path): return {} content = open(file_path).read() return json.loads(content, strict=False) def get_ha3_cluster_map(config_path): base_path = os.path.join(config_path, "clusters") clusters = os.listdir(base_path) cluster_map = {} suffix = "_cluster.json" for f in clusters: file_path = os.path.join(base_path, f) if not f.endswith(suffix): continue cluster_name = f[:-len(suffix)] info = get_file_as_json(os.path.join(base_path, f)) cluster_map[cluster_name] = info return cluster_map def get_zone_list(config_path): zones_path = os.path.join(config_path, "zones") if not os.path.exists(zones_path): return [] return os.listdir(zones_path) def get_default_udf_function_models(binary_path): udf_file = os.path.join(binary_path, "usr/local/etc/sql/sql_function.json") info = get_file_as_json(udf_file) return info.get("functions", []) def get_system_udf_function_models(config_path): system_function = get_db_function(config_path, "system") return system_function.get("functions", []) def zone_name_to_db_name(config_path, zone_name): zone_config_path = os.path.join(config_path, "zones/" + zone_name) if os.path.exists(os.path.join(zone_config_path, "biz.json")): return zone_name else: return zone_name.split(".")[0] def get_db_function(config_path, db_name): func_path = os.path.join(config_path, "sql") func_file = os.path.join(func_path, db_name + "_function.json") return get_file_as_json(func_file) def get_db_function_map(config_path): db_func = {} zone_list = get_zone_list(config_path) for zone_name in zone_list: db_name = zone_name_to_db_name(config_path, zone_name) zone_function = get_db_function(config_path, db_name) functions_array = zone_function.get("functions", []) db_func[db_name] = functions_array return db_func def get_special_catalogs(): catalog_str = sql_envs.get_special_catalog_list() if catalog_str: return catalog_str.split(",") else: return [] def get_zone_config(config_path, zone_name): biz_json_file = os.path.join(config_path, "zones", zone_name, "default_biz.json") return get_file_as_json(biz_json_file) def get_table_schema(config_path, table_name): schema_json_file = os.path.join(config_path, "schemas", table_name + "_schema.json") return get_file_as_json(schema_json_file) def get_item_table_name(config_path, zone_name): zone_config = get_zone_config(config_path, zone_name) cluster_config = zone_config.get("cluster_config", {}) table_name_1 = cluster_config.get("table_name", "") schema_config = get_table_schema(config_path, table_name_1) return schema_config.get("table_name", "") def get_join_config(config_path, zone_name): zone_config = get_zone_config(config_path, zone_name) cluster_config = zone_config.get("cluster_config", {}) return cluster_config.get("join_config", {}) def get_join_infos(config_path, zone_name): join_config = get_join_config(config_path, zone_name) return join_config.get("join_infos", []) def get_turing_options_config(config_path, zone_name): zone_config = get_zone_config(config_path, zone_name) return zone_config.get("turing_options_config", {}) def get_flow_control_config(config_path): flow_control_config_map = {} for zone_name in get_zone_list(config_path): zone_config = get_zone_config(config_path, zone_name) config = zone_config.get("multi_call_config_sql", {}) if not config: continue flow_control_config_map[zone_name + ".default_sql"] = config return flow_control_config_map def get_depend_tables(config_path, zone_name): zones_path = os.path.join(config_path, "zones") if not os.path.exists(zones_path): return get_depend_tables_from_sql_config(config_path) turing_options = get_turing_options_config(config_path, zone_name) return turing_options.get("dependency_table", []) def get_sql_config(config_path): sql_json_file = os.path.join(config_path, "sql.json") if (os.path.exists(sql_json_file)): d = get_file_as_json(sql_json_file) lack_result_enable = d.pop('lack_result_enable', None) result_allow_soft_failure = d.pop('result_allow_soft_failure', lack_result_enable) if result_allow_soft_failure is not None: d['result_allow_soft_failure'] = result_allow_soft_failure return d else: return {} def get_iquan_jni_config(config_path): sql_config = get_sql_config(config_path) return sql_config.get("iquan_jni_config", {}) def get_iquan_client_config(config_path): sql_config = get_sql_config(config_path) return sql_config.get("iquan_client_config", {}) def get_iquan_warmup_config(config_path): sql_config = get_sql_config(config_path) warmup_config = sql_config.get("iquan_warmup_config", {}) if "warmup_file_path" in warmup_config and warmup_config["warmup_file_path"] != "": warmup_config["warmup_file_path"] = os.path.join(config_path, warmup_config["warmup_file_path"]) return warmup_config def get_enable_inner_docid_optimize(config_path): sql_config = get_sql_config(config_path) return sql_config.get("inner_docid_optimize_enable", False) def get_db_name(config_path): sql_config = get_sql_config(config_path) return sql_config.get("db_name", "") def get_db_name_alias(config_path): sql_config = get_sql_config(config_path) return sql_config.get("db_name_alias", {}) def get_format_type(config_path): sql_config = get_sql_config(config_path) return sql_config.get("output_format", "") def get_swift_writer_config(config_path): sql_config = get_sql_config(config_path) if "swift_writer_config" in sql_config: return sql_config["swift_writer_config"] else: return {"table_read_write_config": {}} def get_table_writer_config(config_path): sql_config = get_sql_config(config_path) if "table_writer_config" in sql_config: return sql_config["table_writer_config"] else: return {"zone_names": []} def get_one_function_config(config_path, f): one_function_file = os.path.join(config_path, f) one_function_map = get_file_as_json(one_function_file) if "functions" in one_function_map: return one_function_map["functions"] else: return [] def get_function_config(config_path, zone_name): zone_config = get_zone_config(config_path, zone_name) if "function_config" in zone_config: function_config = zone_config["function_config"] if "cava_functions" in function_config: del function_config["cava_functions"] function_config["config_path"] = config_path if "modules" in function_config: one_function_configs = [] for module in function_config["modules"]: if "parameters" in module: parameters = module["parameters"] if "config_path" in parameters: one_function_configs += get_one_function_config(config_path, parameters["config_path"]) if "functions" in function_config: function_config["functions"] += one_function_configs else: function_config["functions"] = one_function_configs del function_config["modules"] return function_config else: return {} def get_function_plugins(config_path, load_zone_name): so_list = [] for zone_name in get_zone_list(config_path): if load_zone_name != "" and zone_name != load_zone_name: continue zone_config = get_zone_config(config_path, zone_name) func_config = zone_config.get("function_config", {}) if "modules" not in func_config: continue modules = func_config["modules"] for module in modules: so_list.append(get_module_abs_path(config_path, module["module_path"])) return list(set(so_list)) def get_auth_config(config_path): sql_config = get_sql_config(config_path) return sql_config.get("authentication_config", {}) def get_qrs_config(config_path): qrs_json_file = os.path.join(config_path, "qrs.json") return get_file_as_json(qrs_json_file) def get_qrs_cava_function_infos(config_path): json_info = {} json_file = os.path.join(config_path, "qrs_func.json") json_info = get_file_as_json(json_file) return json_info.get("cava_functions", []) def get_qrs_cava_config(config_path): biz_json_file = os.path.join(config_path, "biz.json") biz_config = get_file_as_json(biz_json_file) if "cava_config" in biz_config: return biz_config["cava_config"] qrs_config = get_qrs_config(config_path) return qrs_config.get("cava_config", {}) def get_cava_plugin_manager_config(config_path, zone_name, only_qrs): if only_qrs: return get_qrs_cava_plugin_manager_config(config_path) else: return get_searcher_cava_plugin_manager_config(config_path, zone_name) def get_qrs_cava_plugin_manager_config(config_path): auto_register = [] cava_config = get_qrs_cava_config(config_path) if "auto_register_function_packages" in cava_config: auto_register = cava_config["auto_register_function_packages"] return { "function_infos": get_qrs_cava_function_infos(config_path), "auto_register_function_packages": auto_register } def get_searcher_cava_plugin_manager_config(config_path, zone_name): auto_register = [] infos = [] zone_config = get_zone_config(config_path, zone_name) if "function_config" in zone_config: function_config = zone_config["function_config"] if "cava_functions" in function_config: infos += function_config["cava_functions"] if "cava_config" in zone_config: cava_config = zone_config["cava_config"] if "auto_register_function_packages" in cava_config: auto_register = cava_config["auto_register_function_packages"] return { "function_infos": infos, "auto_register_function_packages": auto_register } def get_cava_config(config_path, zone_name, is_qrs): config = {} if not is_qrs: zone_config = get_zone_config(config_path, zone_name) if "cava_config" in zone_config: config = zone_config["cava_config"] else: config = get_qrs_cava_config(config_path) config["cava_conf"] = "../binary/usr/local/etc/sql/sql_cava_config.json" return { "config_path": config_path, "cava_config": config } def get_logic_table_config(config_path): tables = {} base_path = os.path.join(config_path, "sql") if not os.path.exists(base_path): return tables files = os.listdir(base_path) suffix = "_logictable.json" for f in files: file_path = os.path.join(base_path, f) if not f.endswith(suffix): continue db_name = f[:-len(suffix)] info = get_file_as_json(os.path.join(base_path, f)) if "tables" in info: tables[db_name] = info["tables"] logging.info("add logic table file: " + file_path + ", db: " + db_name + ": " + json.dumps(info)) return tables def get_layer_table_config(config_path): tables = {} base_path = os.path.join(config_path, "sql") if not os.path.exists(base_path): return tables files = os.listdir(base_path) suffix = "_layer_table.json" for f in files: file_path = os.path.join(base_path, f) if not f.endswith(suffix): continue db_name = f[:-len(suffix)] info = get_file_as_json(os.path.join(base_path, f)) if "layer_tables" in info: tables[db_name] = info["layer_tables"] logging.info("add layer table file: " + file_path + "db: " + db_name + ": " + json.dumps(info)) return tables def get_agg_plugins(config_path): sql_config = get_sql_config(config_path) so_list = [] if "sql_agg_plugin_config" in sql_config: agg_config = sql_config["sql_agg_plugin_config"] if "modules" in agg_config: modules = agg_config["modules"] for module in modules: if "module_path" in module: so_list.append(get_module_abs_path(config_path, module["module_path"])) return so_list def get_tvf_plugins(config_path): sql_config = get_sql_config(config_path) so_list = [] if "sql_tvf_plugin_config" in sql_config: tvf_config = sql_config["sql_tvf_plugin_config"] if "modules" in tvf_config: modules = tvf_config["modules"] for module in modules: if "module_path" in module: so_list.append(get_module_abs_path(config_path, module["module_path"])) return so_list def get_module_abs_path(config_path, module_path): plugin_path = os.path.join(config_path, "plugins", module_path) if os.path.exists(plugin_path): return plugin_path else: return module_path def get_tvf_plugin_config(config_path): sql_config = get_sql_config(config_path) if "sql_tvf_plugin_config" in sql_config: tvf_config = sql_config["sql_tvf_plugin_config"] tvf_config["config_path"] = config_path return tvf_config return { "config_path": config_path, } def get_tvf_resource_config(config_path): tvf_plugin_config = get_tvf_plugin_config(config_path) if "tvf_profiles" not in tvf_plugin_config: return [] profiles = tvf_plugin_config["tvf_profiles"] config_map = {} for profile in profiles: if "func_name" not in profile: continue func_name = profile["func_name"] if "tvf_name" not in profile: continue if func_name not in config_map: config_map[func_name] = [profile] else: config_map[func_name].append(profile) resource_config = [] for func, profiles in config_map.items(): resource_config.append({ "name": func, "config": { "config_path": config_path, "tvf_profiles": profiles } }) return resource_config def get_enable_turbojet(config_path): sql_config = get_sql_config(config_path) return sql_config.get("enable_turbojet", False) def get_enable_scan_timeout(config_path): sql_config = get_sql_config(config_path) return sql_config.get("enable_scan_timeout", True) def get_external_table_config(config_path): sql_config = get_sql_config(config_path) return sql_config.get("external_table_config", {}) def get_external_gig_config(config_path): external_config = get_external_table_config(config_path) gig_config = external_config.get("gig_config", {}) return gig_config.get("subscribe", {}), gig_config.get("flow_control", {}) def get_flow_control_config_from_sql_config(config_path): sql_config = get_sql_config(config_path) flow_control_config_map = {} flow_config = sql_config.get("multi_call_config_sql", {}) for zone_name, config in flow_config.items(): flow_control_config_map[zone_name + ".default_sql"] = config return flow_control_config_map def get_depend_tables_from_sql_config(config_path): sql_config = get_sql_config(config_path) return sql_config.get('depend_tables', []) def get_item_table_name_from_sql_config(config_path): sql_config = get_sql_config(config_path) return sql_config.get('item_table', '')