def batch_items_within_partition()

in utilities/Hive_metastore_migration/src/hive_metastore_migration.py [0:0]


def batch_items_within_partition(sql_context, df, key_col, value_col, values_col):
    """
    Group a DataFrame of key, value pairs, create a list of values for the same key in each spark partition, but there
    is no cross-partition data interaction, so the same key may be shown multiple times in the output dataframe
    :param sql_context: spark sqlContext
    :param df: DataFrame with only two columns, a key_col and a value_col
    :param key_col: name of key column
    :param value_col: name of value column
    :param values_col: name of values column, which is an array of value_col
    :type df: DataFrame
    :type key_col: str
    :type value_col: str
    :return: DataFrame of values grouped by key within each partition
    """
    def group_by_key(it):
        grouped = dict()
        for row in it:
            (k, v) = (row[key_col], row[value_col])
            if k in grouped:
                grouped[k].append(v)
            else:
                grouped[k] = [v]
        row = Row(key_col, values_col)
        for k in grouped:
            yield row(k, grouped[k])

    return sql_context.createDataFrame(data=df.rdd.mapPartitions(group_by_key), schema=StructType([
        StructField(key_col, get_schema_type(df, key_col), True),
        StructField(values_col, ArrayType(get_schema_type(df, value_col)), True)
    ]))