tools/legacy_sql_tranlsation_helper/translatorCommaJoin.py (110 lines of code) (raw):

from google.cloud import bigquery import readSQLToString import re legacy_sql = """Replace your SQL here. """ file_path = "sample_sql/sample1.sql" if file_path != "" and legacy_sql == "": legacy_sql = readSQLToString.read_file_path(file_path) class translatorCommaJoin: col_type = {} client = bigquery.Client() #Fetch selected columns. Manual adjustment needed when having comma in the functions in the select statement. def list_sel_column(self, legacy_sql): sub1 = "SELECT" sub2 = "FROM" idx1 = legacy_sql.index(sub1) idx2 = legacy_sql.index(sub2) col_str = "" for idx in range(idx1 + len(sub1) + 1, idx2): col_str = col_str + legacy_sql[idx] col_list = col_str.split(",") initial_col_list = [] for i in range(len(col_list)): # Manual adjustment needed when having comma in the functions in the select statement. if i == "to be updated according to your sql": continue else: initial_col_list.append(col_list[i].replace("\n", "").lstrip().rstrip()) return initial_col_list def list_table(self,legacy_sql): sub1 = "FROM" sub2 = "WHERE" idx1 = legacy_sql.index(sub1) if sub2 in legacy_sql: idx2 = legacy_sql.index(sub2) else: idx2 = len(legacy_sql) table_str = '' for idx in range(idx1 + len(sub1) + 1, idx2): table_str = table_str + legacy_sql[idx] table_list = table_str.split(",") res_table_list = [] for item in table_list: item = item.replace("\n", "").lstrip().rstrip() item = f"`{item.replace('[', '').replace(':', '.').replace(']', '')}`" res_table_list.append(item.replace("\n", "").lstrip().rstrip()) return res_table_list def parse_table_name(self,table): table_name = "'" + table.split(".")[2].rstrip("`") + "'" dataset_id = table.split(".")[1] project_id = table.split(".")[0].lstrip("`") return table_name, dataset_id, project_id def list_exist_col(self,table_name): table_name = table_name.replace("`", "") table = self.client.get_table(table_name) schema = table.schema exist_col_type = {} for col in schema: exist_col_type[col.name] = col.field_type return exist_col_type def check_col_exist(self, table, initial_col_list): exist_col_type = self.list_exist_col(table) exist_dict = {} for sel_col in initial_col_list: if sel_col in exist_col_type: exist_dict[sel_col] = 1 else: exist_dict[sel_col] = 0 print("For Table : ") print(table) print("Column check result :" ) print(exist_dict) return exist_dict def new_select(self, table, initial_col_list): exist_dict = self.check_col_exist(table, initial_col_list) new_select = "SELECT " for col in initial_col_list: if col in exist_dict and exist_dict[col] == 1: new_select += col new_select += ", " else: new_select += "null as " new_select += col new_select += ", " new_select += " FROM " new_select += table print("the new select statement for" + table + " : " + new_select) return new_select def build_new_sql(self, res_table_list, initial_col_list): union_all_sql = "" for i in range(len(res_table_list)): table = res_table_list[i] union_all_sql += self.new_select(table, initial_col_list) if i < len(res_table_list) - 1: union_all_sql += " UNION ALL " return union_all_sql def cast_null_type(self, union_all_sql): for col in self.col_type: union_all_sql = re.sub("null as {}".format(col), "cast(null as {}) as {}".format(self.col_type[col], col), union_all_sql) return union_all_sql def translate_comma_join(self, legacy_sql): initial_col_list = self.list_sel_column(legacy_sql) print("Initially selected columns are :") print(initial_col_list) res_table_list = self.list_table(legacy_sql) print("Selected tables are : ") print(res_table_list) union_all_sql = self.build_new_sql(res_table_list, initial_col_list) output_sql = self.cast_null_type(union_all_sql) print("Output SQL is : " ) print(output_sql) obj = translatorCommaJoin() obj.translate_comma_join(legacy_sql)