def transform_partitions()

in utilities/Hive_metastore_migration/src/hive_metastore_migration.py [0:0]


    def transform_partitions(self, db_tbl_joined, ms_partitions, storage_descriptors, ms_partition_params,
                             ms_partition_key_vals):
        parts_date_transformed = self.transform_timestamp_cols(df=ms_partitions, date_cols_map={
            'CREATE_TIME': 'creationTime',
            'LAST_ACCESS_TIME': 'lastAccessTime'
        })
        db_tbl_names = db_tbl_joined.select(db_tbl_joined['NAME'].alias('namespaceName'),
                                            db_tbl_joined['TBL_NAME'].alias('tableName'), 'DB_ID', 'TBL_ID')
        parts_with_db_tbl = parts_date_transformed.join(other=db_tbl_names, on='TBL_ID', how='inner')
        parts_with_params = self.join_with_params(df=parts_with_db_tbl, df_params=self.transform_param_value(ms_partition_params), id_col='PART_ID')
        parts_with_sd = parts_with_params.join_other_to_single_column(
            other=storage_descriptors, on='SD_ID', how='left_outer', new_column_name='storageDescriptor')
        part_values = self.transform_ms_partition_key_vals(ms_partition_key_vals)
        parts_with_values = parts_with_sd.join(other=part_values, on='PART_ID', how='left_outer')
        parts_renamed = rename_columns(df=parts_with_values, rename_tuples=[
            ('CREATE_TIME', 'createTime'),
            ('LAST_ACCESS_TIME', 'lastAccessTime')
        ])
        parts_dropped_cols = parts_renamed.drop_columns([
            'DB_ID', 'TBL_ID', 'PART_ID', 'SD_ID', 'PART_NAME', 'LINK_TARGET_ID'
        ])
        parts_drop_invalid = parts_dropped_cols.na.drop(how='any', subset=['values', 'namespaceName', 'tableName'])
        parts_final = parts_drop_invalid.select(
            parts_drop_invalid['namespaceName'].alias('database'),
            parts_drop_invalid['tableName'].alias('table'),
            struct(parts_drop_invalid.columns).alias('item')
        ).withColumn('type', lit('partition'))
        return parts_final