in tools/hive-bigquery/hive_to_bigquery/hive_table_model.py [0:0]
def flatten_schema(self):
"""Returns Hive table schema in flat structure.
Nested data types in Hive schema are represented using '<'. For
example, array of integers is represented as 'array<int>'. Similarly,
maps and structs are represented too. To compare the data types in
Hive and BigQuery, this schema needs to be flattened out and then the
internal data type can be compared.
For example col_name(map<string,array<int>>) in Hive is flattened as
{
"col_name" : "map",
"col_name__key" : "string",
"col_name__value" : "array_int"
}
Uses string extraction to flatten the schema.
Returns:
dict: A dictionary mapping flattened columns and their data types.
"""
def recursively_flatten(name, item_type):
"""Iterates through the nested fields and gets the data types.
Args:
name (str): Flattened column name.
item_type (str): Flattened column type.
"""
columns.append(name)
if '<' in item_type:
col_type = item_type.split('<')[0]
# If type is array, recursively flatten the nested structure.
if col_type == 'array':
col_types.append('array')
recursively_flatten(
name, '<'.join(item_type.split('<')[1:])[:-1])
# If type is map, recursively flatten the value in the map.
elif col_type == 'map':
col_types.append('map')
columns.append(name + '__key')
col_types.append('string')
recursively_flatten(
name + '__value', ','.join('<'.join(
item_type.split('<')[1:])[:-1].split(',')[1:]))
elif col_type == "uniontype":
col_types.append('union')
# If type is struct, recursively flatten all the fields inside.
elif col_type == 'struct':
col_types.append('struct')
struct_info = '<'.join(item_type.split('<')[1:])[:-1]
rand = []
struct_split = struct_info.split(',')
for i, struct_item in enumerate(struct_split):
if struct_item.count('<') == struct_item.count('>'):
rand.append(struct_item)
else:
struct_split[i + 1] = struct_item + ',' + \
struct_split[i + 1]
for item in rand:
recursively_flatten(name + '__' + item.split(':')[0],
':'.join(item.split(':')[1:]))
else:
col_types.append(item_type)
columns = []
col_types = []
for name, col_type in self.schema.items():
recursively_flatten(name, col_type)
list_tuple = zip(columns, col_types)
col_dict = OrderedDict()
for item in list_tuple:
if item[0] in col_dict.keys():
col_dict[str(item[0])].append(str(item[1]))
else:
col_dict[str(item[0])] = [str(item[1])]
for key, value in col_dict.items():
if len(value) >= 2:
collapse_string = "array_" * value.count('array') + \
[item for item in value if item != 'array'][0]
col_dict[key] = collapse_string
else:
col_dict[key] = value[0]
for key, value in col_dict.items():
if 'decimal' in value:
col_dict[key] = 'decimal'
elif 'varchar' in value:
col_dict[key] = 'varchar'
elif 'char' in value:
col_dict[key] = 'char'
return col_dict