in sql/connect/common/src/main/scala/org/apache/spark/sql/connect/KeyValueGroupedDataset.scala [140:415]
private[sql] def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
timeMode: TimeMode,
outputMode: OutputMode): Dataset[U] =
transformWithStateHelper(statefulProcessor, timeMode, outputMode)
/** @inheritdoc */
private[sql] def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
timeMode: TimeMode,
outputMode: OutputMode,
initialState: sql.KeyValueGroupedDataset[K, S]): Dataset[U] =
transformWithStateHelper(statefulProcessor, timeMode, outputMode, Some(initialState))
/** @inheritdoc */
override private[sql] def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
eventTimeColumnName: String,
outputMode: OutputMode): Dataset[U] =
transformWithStateHelper(
statefulProcessor,
TimeMode.EventTime(),
outputMode,
eventTimeColumnName = eventTimeColumnName)
/** @inheritdoc */
override private[sql] def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
eventTimeColumnName: String,
outputMode: OutputMode,
initialState: sql.KeyValueGroupedDataset[K, S]): Dataset[U] =
transformWithStateHelper(
statefulProcessor,
TimeMode.EventTime(),
outputMode,
Some(initialState),
eventTimeColumnName)
// This is an interface, and it should not be used. The real implementation is in the
// inherited class.
protected[sql] def transformWithStateHelper[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
timeMode: TimeMode,
outputMode: OutputMode,
initialState: Option[sql.KeyValueGroupedDataset[K, S]] = None,
eventTimeColumnName: String = ""): Dataset[U] = unsupported()
// Overrides...
/** @inheritdoc */
override def mapValues[W](
func: MapFunction[V, W],
encoder: Encoder[W]): KeyValueGroupedDataset[K, W] = super.mapValues(func, encoder)
/** @inheritdoc */
override def flatMapGroups[U: Encoder](f: (K, Iterator[V]) => IterableOnce[U]): Dataset[U] =
super.flatMapGroups(f)
/** @inheritdoc */
override def flatMapGroups[U](
f: FlatMapGroupsFunction[K, V, U],
encoder: Encoder[U]): Dataset[U] = super.flatMapGroups(f, encoder)
/** @inheritdoc */
override def flatMapSortedGroups[U](
SortExprs: Array[Column],
f: FlatMapGroupsFunction[K, V, U],
encoder: Encoder[U]): Dataset[U] = super.flatMapSortedGroups(SortExprs, f, encoder)
/** @inheritdoc */
override def mapGroups[U: Encoder](f: (K, Iterator[V]) => U): Dataset[U] = super.mapGroups(f)
/** @inheritdoc */
override def mapGroups[U](f: MapGroupsFunction[K, V, U], encoder: Encoder[U]): Dataset[U] =
super.mapGroups(f, encoder)
/** @inheritdoc */
override def mapGroupsWithState[S, U](
func: MapGroupsWithStateFunction[K, V, S, U],
stateEncoder: Encoder[S],
outputEncoder: Encoder[U]): Dataset[U] =
super.mapGroupsWithState(func, stateEncoder, outputEncoder)
/** @inheritdoc */
override def mapGroupsWithState[S, U](
func: MapGroupsWithStateFunction[K, V, S, U],
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
timeoutConf: GroupStateTimeout): Dataset[U] =
super.mapGroupsWithState(func, stateEncoder, outputEncoder, timeoutConf)
/** @inheritdoc */
override def mapGroupsWithState[S, U](
func: MapGroupsWithStateFunction[K, V, S, U],
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
timeoutConf: GroupStateTimeout,
initialState: sql.KeyValueGroupedDataset[K, S]): Dataset[U] =
super.mapGroupsWithState(func, stateEncoder, outputEncoder, timeoutConf, initialState)
/** @inheritdoc */
override def flatMapGroupsWithState[S, U](
func: FlatMapGroupsWithStateFunction[K, V, S, U],
outputMode: OutputMode,
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
timeoutConf: GroupStateTimeout): Dataset[U] =
super.flatMapGroupsWithState(func, outputMode, stateEncoder, outputEncoder, timeoutConf)
/** @inheritdoc */
override def flatMapGroupsWithState[S, U](
func: FlatMapGroupsWithStateFunction[K, V, S, U],
outputMode: OutputMode,
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
timeoutConf: GroupStateTimeout,
initialState: sql.KeyValueGroupedDataset[K, S]): Dataset[U] = super.flatMapGroupsWithState(
func,
outputMode,
stateEncoder,
outputEncoder,
timeoutConf,
initialState)
/** @inheritdoc */
override private[sql] def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
timeMode: TimeMode,
outputMode: OutputMode,
outputEncoder: Encoder[U]) =
super.transformWithState(statefulProcessor, timeMode, outputMode, outputEncoder)
/** @inheritdoc */
override private[sql] def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
eventTimeColumnName: String,
outputMode: OutputMode,
outputEncoder: Encoder[U]) =
super.transformWithState(statefulProcessor, eventTimeColumnName, outputMode, outputEncoder)
/** @inheritdoc */
override private[sql] def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
timeMode: TimeMode,
outputMode: OutputMode,
initialState: sql.KeyValueGroupedDataset[K, S],
outputEncoder: Encoder[U],
initialStateEncoder: Encoder[S]) = super.transformWithState(
statefulProcessor,
timeMode,
outputMode,
initialState,
outputEncoder,
initialStateEncoder)
/** @inheritdoc */
override private[sql] def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
outputMode: OutputMode,
initialState: sql.KeyValueGroupedDataset[K, S],
eventTimeColumnName: String,
outputEncoder: Encoder[U],
initialStateEncoder: Encoder[S]) = super.transformWithState(
statefulProcessor,
outputMode,
initialState,
eventTimeColumnName,
outputEncoder,
initialStateEncoder)
/** @inheritdoc */
override def reduceGroups(f: ReduceFunction[V]): Dataset[(K, V)] = super.reduceGroups(f)
/** @inheritdoc */
override def agg[U1](col1: TypedColumn[V, U1]): Dataset[(K, U1)] = super.agg(col1)
/** @inheritdoc */
override def agg[U1, U2](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2]): Dataset[(K, U1, U2)] = super.agg(col1, col2)
/** @inheritdoc */
override def agg[U1, U2, U3](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2],
col3: TypedColumn[V, U3]): Dataset[(K, U1, U2, U3)] = super.agg(col1, col2, col3)
/** @inheritdoc */
override def agg[U1, U2, U3, U4](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2],
col3: TypedColumn[V, U3],
col4: TypedColumn[V, U4]): Dataset[(K, U1, U2, U3, U4)] = super.agg(col1, col2, col3, col4)
/** @inheritdoc */
override def agg[U1, U2, U3, U4, U5](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2],
col3: TypedColumn[V, U3],
col4: TypedColumn[V, U4],
col5: TypedColumn[V, U5]): Dataset[(K, U1, U2, U3, U4, U5)] =
super.agg(col1, col2, col3, col4, col5)
/** @inheritdoc */
override def agg[U1, U2, U3, U4, U5, U6](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2],
col3: TypedColumn[V, U3],
col4: TypedColumn[V, U4],
col5: TypedColumn[V, U5],
col6: TypedColumn[V, U6]): Dataset[(K, U1, U2, U3, U4, U5, U6)] =
super.agg(col1, col2, col3, col4, col5, col6)
/** @inheritdoc */
override def agg[U1, U2, U3, U4, U5, U6, U7](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2],
col3: TypedColumn[V, U3],
col4: TypedColumn[V, U4],
col5: TypedColumn[V, U5],
col6: TypedColumn[V, U6],
col7: TypedColumn[V, U7]): Dataset[(K, U1, U2, U3, U4, U5, U6, U7)] =
super.agg(col1, col2, col3, col4, col5, col6, col7)
/** @inheritdoc */
override def agg[U1, U2, U3, U4, U5, U6, U7, U8](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2],
col3: TypedColumn[V, U3],
col4: TypedColumn[V, U4],
col5: TypedColumn[V, U5],
col6: TypedColumn[V, U6],
col7: TypedColumn[V, U7],
col8: TypedColumn[V, U8]): Dataset[(K, U1, U2, U3, U4, U5, U6, U7, U8)] =
super.agg(col1, col2, col3, col4, col5, col6, col7, col8)
/** @inheritdoc */
override def count(): Dataset[(K, Long)] = super.count()
/** @inheritdoc */
override def cogroup[U, R: Encoder](other: sql.KeyValueGroupedDataset[K, U])(
f: (K, Iterator[V], Iterator[U]) => IterableOnce[R]): Dataset[R] =
super.cogroup(other)(f)
/** @inheritdoc */
override def cogroup[U, R](
other: sql.KeyValueGroupedDataset[K, U],
f: CoGroupFunction[K, V, U, R],
encoder: Encoder[R]): Dataset[R] = super.cogroup(other, f, encoder)
/** @inheritdoc */
override def cogroupSorted[U, R](
other: sql.KeyValueGroupedDataset[K, U],
thisSortExprs: Array[Column],
otherSortExprs: Array[Column],
f: CoGroupFunction[K, V, U, R],
encoder: Encoder[R]): Dataset[R] =
super.cogroupSorted(other, thisSortExprs, otherSortExprs, f, encoder)
}
/**
* This class is the implementation of class [[KeyValueGroupedDataset]]. This class memorizes the
* initial types of the grouping function so that the original function will be sent to the server
* to perform the grouping first. Then any type modifications on the keys and the values will be
* applied sequentially to ensure the final type of the result remains the same as how
* [[KeyValueGroupedDataset]] behaves on the server.
*/
private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
private val sparkSession: SparkSession,
private val plan: proto.Plan,
private val kEncoder: AgnosticEncoder[K],
private val ivEncoder: AgnosticEncoder[IV],
private val vEncoder: AgnosticEncoder[V],
private val groupingColumns: Seq[Column],
private val valueMapFunc: Option[IV => V],
private val keysFunc: () => Dataset[IK])
extends KeyValueGroupedDataset[K, V] {