in utilities/Hive_metastore_migration/src/hive_metastore_migration.py [0:0]
def transform_tables(self, db_tbl_joined, ms_table_params, storage_descriptors, ms_partition_keys):
tbls_date_transformed = self.transform_timestamp_cols(db_tbl_joined, date_cols_map={
'CREATE_TIME': 'createTime',
'LAST_ACCESS_TIME': 'lastAccessTime'
})
tbls_with_params = self.join_with_params(df=tbls_date_transformed, df_params=self.transform_param_value(ms_table_params), id_col='TBL_ID')
partition_keys = self.transform_ms_partition_keys(ms_partition_keys)
tbls_joined = tbls_with_params\
.join(other=partition_keys, on='TBL_ID', how='left_outer')\
.join_other_to_single_column(other=storage_descriptors, on='SD_ID', how='left_outer',
new_column_name='storageDescriptor')
tbls_renamed = rename_columns(df=tbls_joined, rename_tuples=[
('NAME', 'database'),
('TBL_NAME', 'name'),
('TBL_TYPE', 'tableType'),
('CREATE_TIME', 'createTime'),
('LAST_ACCESS_TIME', 'lastAccessTime'),
('OWNER', 'owner'),
('RETENTION', 'retention'),
('VIEW_EXPANDED_TEXT', 'viewExpandedText'),
('VIEW_ORIGINAL_TEXT', 'viewOriginalText'),
])
tbls_dropped_cols = tbls_renamed.drop_columns(['DB_ID', 'TBL_ID', 'SD_ID', 'LINK_TARGET_ID'])
tbls_drop_invalid = tbls_dropped_cols.na.drop(how='any', subset=['name', 'database'])
tbls_with_empty_part_cols = HiveMetastoreTransformer.fill_none_with_empty_list(
tbls_drop_invalid, 'partitionKeys')
tbls_final = tbls_with_empty_part_cols.select(
'database', struct(remove(tbls_dropped_cols.columns, 'database')).alias('item')
).withColumn('type', lit('table'))
return tbls_final