in utilities/Hive_metastore_migration/src/hive_metastore_migration.py [0:0]
def transform_partitions(self, db_tbl_joined, ms_partitions, storage_descriptors, ms_partition_params,
ms_partition_key_vals):
parts_date_transformed = self.transform_timestamp_cols(df=ms_partitions, date_cols_map={
'CREATE_TIME': 'creationTime',
'LAST_ACCESS_TIME': 'lastAccessTime'
})
db_tbl_names = db_tbl_joined.select(db_tbl_joined['NAME'].alias('namespaceName'),
db_tbl_joined['TBL_NAME'].alias('tableName'), 'DB_ID', 'TBL_ID')
parts_with_db_tbl = parts_date_transformed.join(other=db_tbl_names, on='TBL_ID', how='inner')
parts_with_params = self.join_with_params(df=parts_with_db_tbl, df_params=self.transform_param_value(ms_partition_params), id_col='PART_ID')
parts_with_sd = parts_with_params.join_other_to_single_column(
other=storage_descriptors, on='SD_ID', how='left_outer', new_column_name='storageDescriptor')
part_values = self.transform_ms_partition_key_vals(ms_partition_key_vals)
parts_with_values = parts_with_sd.join(other=part_values, on='PART_ID', how='left_outer')
parts_renamed = rename_columns(df=parts_with_values, rename_tuples=[
('CREATE_TIME', 'createTime'),
('LAST_ACCESS_TIME', 'lastAccessTime')
])
parts_dropped_cols = parts_renamed.drop_columns([
'DB_ID', 'TBL_ID', 'PART_ID', 'SD_ID', 'PART_NAME', 'LINK_TARGET_ID'
])
parts_drop_invalid = parts_dropped_cols.na.drop(how='any', subset=['values', 'namespaceName', 'tableName'])
parts_final = parts_drop_invalid.select(
parts_drop_invalid['namespaceName'].alias('database'),
parts_drop_invalid['tableName'].alias('table'),
struct(parts_drop_invalid.columns).alias('item')
).withColumn('type', lit('partition'))
return parts_final