def transform_skewed_values_and_loc_map()

in utilities/Hive_metastore_migration/src/hive_metastore_migration.py [0:0]


    def transform_skewed_values_and_loc_map(self, ms_skewed_string_list_values, ms_skewed_col_value_loc_map):
        # columns: (STRING_LIST_ID:BigInt, skewedColumnValuesList:List[String])
        skewed_values_list = self.transform_ms_skewed_string_list_values(ms_skewed_string_list_values)

        # columns: (STRING_LIST_ID:BigInt, skewedColumnValuesStr:String)
        skewed_value_str = self.modify_column_by_udf(df=skewed_values_list,
                                                     udf=HiveMetastoreTransformer.udf_skewed_values_to_str(),
                                                     column_to_modify='skewedColumnValuesList',
                                                     new_column_name='skewedColumnValuesStr')

        # columns: (SD_ID: BigInt, STRING_LIST_ID_KID: BigInt, STRING_LIST_ID: BigInt,
        # LOCATION: String, skewedColumnValuesStr: String)
        skewed_value_str_with_loc = ms_skewed_col_value_loc_map \
            .join(other=skewed_value_str,
                  on=[ms_skewed_col_value_loc_map['STRING_LIST_ID_KID'] == skewed_value_str['STRING_LIST_ID']],
                  how='inner')

        # columns: (SD_ID: BigInt, skewedColumnValueLocationMaps: Map[String, String])
        skewed_column_value_location_maps = self.kv_pair_to_map(df=skewed_value_str_with_loc,
                                                                id_col='SD_ID',
                                                                key='skewedColumnValuesStr',
                                                                value='LOCATION',
                                                                map_col_name='skewedColumnValueLocationMaps')

        # columns: (SD_ID: BigInt, skewedColumnValues: List[String])
        skewed_column_values = self.sql_context.createDataFrame(
            data=skewed_value_str_with_loc.rdd.map(
                lambda row: (row['SD_ID'], row['skewedColumnValues'])
            ).aggregateByKey([], append, extend),
            schema=StructType([
                StructField(name='SD_ID', dataType=LongType()),
                StructField(name='skewedColumnValues', dataType=ArrayType(elementType=StringType()))
            ]))

        return skewed_column_values, skewed_column_value_location_maps