in utilities/Hive_metastore_migration/src/hive_metastore_migration.py [0:0]
def batch_items_within_partition(sql_context, df, key_col, value_col, values_col):
"""
Group a DataFrame of key, value pairs, create a list of values for the same key in each spark partition, but there
is no cross-partition data interaction, so the same key may be shown multiple times in the output dataframe
:param sql_context: spark sqlContext
:param df: DataFrame with only two columns, a key_col and a value_col
:param key_col: name of key column
:param value_col: name of value column
:param values_col: name of values column, which is an array of value_col
:type df: DataFrame
:type key_col: str
:type value_col: str
:return: DataFrame of values grouped by key within each partition
"""
def group_by_key(it):
grouped = dict()
for row in it:
(k, v) = (row[key_col], row[value_col])
if k in grouped:
grouped[k].append(v)
else:
grouped[k] = [v]
row = Row(key_col, values_col)
for k in grouped:
yield row(k, grouped[k])
return sql_context.createDataFrame(data=df.rdd.mapPartitions(group_by_key), schema=StructType([
StructField(key_col, get_schema_type(df, key_col), True),
StructField(values_col, ArrayType(get_schema_type(df, value_col)), True)
]))