in utilities/Hive_metastore_migration/src/hive_metastore_migration.py [0:0]
def transform_skewed_values_and_loc_map(self, ms_skewed_string_list_values, ms_skewed_col_value_loc_map):
# columns: (STRING_LIST_ID:BigInt, skewedColumnValuesList:List[String])
skewed_values_list = self.transform_ms_skewed_string_list_values(ms_skewed_string_list_values)
# columns: (STRING_LIST_ID:BigInt, skewedColumnValuesStr:String)
skewed_value_str = self.modify_column_by_udf(df=skewed_values_list,
udf=HiveMetastoreTransformer.udf_skewed_values_to_str(),
column_to_modify='skewedColumnValuesList',
new_column_name='skewedColumnValuesStr')
# columns: (SD_ID: BigInt, STRING_LIST_ID_KID: BigInt, STRING_LIST_ID: BigInt,
# LOCATION: String, skewedColumnValuesStr: String)
skewed_value_str_with_loc = ms_skewed_col_value_loc_map \
.join(other=skewed_value_str,
on=[ms_skewed_col_value_loc_map['STRING_LIST_ID_KID'] == skewed_value_str['STRING_LIST_ID']],
how='inner')
# columns: (SD_ID: BigInt, skewedColumnValueLocationMaps: Map[String, String])
skewed_column_value_location_maps = self.kv_pair_to_map(df=skewed_value_str_with_loc,
id_col='SD_ID',
key='skewedColumnValuesStr',
value='LOCATION',
map_col_name='skewedColumnValueLocationMaps')
# columns: (SD_ID: BigInt, skewedColumnValues: List[String])
skewed_column_values = self.sql_context.createDataFrame(
data=skewed_value_str_with_loc.rdd.map(
lambda row: (row['SD_ID'], row['skewedColumnValues'])
).aggregateByKey([], append, extend),
schema=StructType([
StructField(name='SD_ID', dataType=LongType()),
StructField(name='skewedColumnValues', dataType=ArrayType(elementType=StringType()))
]))
return skewed_column_values, skewed_column_value_location_maps