in stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala [2248:2920]
def groupBy[K](
maxSubstreams: Int,
f: function.Function[Out, K],
allowClosedSubstreamRecreation: Boolean): SubFlow[In, Out, Mat] =
new SubFlow(delegate.groupBy(maxSubstreams, f.apply, allowClosedSubstreamRecreation))
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
* using the given function. When a new key is encountered for the first time
* a new substream is opened and subsequently fed with all elements belonging to
* that key.
*
* WARNING: The operator keeps track of all keys of streams that have already been closed.
* If you expect an infinite number of keys this can cause memory issues. Elements belonging
* to those keys are drained directly and not send to the substream.
*
* @see [[#groupBy]]
*/
def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.groupBy(maxSubstreams, f.apply, false))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with
* the current element if the given predicate returns true for it. This means
* that for the following series of predicate values, three substreams will
* be produced with lengths 1, 2, and 3:
*
* {{{
* false, // element goes into first substream
* true, false, // elements go into second substream
* true, false, false // elements go into third substream
* }}}
*
* In case the *first* element of the stream matches the predicate, the first
* substream emitted by splitWhen will start from that element. For example:
*
* {{{
* true, false, false // first substream starts from the split-by element
* true, false // subsequent substreams operate the same way
* }}}
*
* The object returned from this method is not a normal [[Flow]],
* it is a [[SubFlow]]. This means that after this operator all transformations
* are applied to all encountered substreams in the same fashion. Substream mode
* is exited either by closing the substream (i.e. connecting it to a [[Sink]])
* or by merging the substreams back together; see the `to` and `mergeBack` methods
* on [[SubFlow]] for more information.
*
* It is important to note that the substreams also propagate back-pressure as
* any other stream, which means that blocking one substream will block the `splitWhen`
* operator itself—and thereby all substreams—once all internal or
* explicit buffers are filled.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[pekko.stream.Supervision#stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element for which the provided predicate is true, opening and emitting
* a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous
* is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain()`, downstream
* cancels or any substream cancels on `SubstreamCancelStrategy.propagate()`
*
* See also [[Flow.splitAfter]].
*/
def splitWhen(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitWhen(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with
* the current element if the given predicate returns true for it.
*
* @see [[#splitWhen]]
*/
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true. This means that for the following series of predicate values,
* three substreams will be produced with lengths 2, 2, and 3:
*
* {{{
* false, true, // elements go into first substream
* false, true, // elements go into second substream
* false, false, true // elements go into third substream
* }}}
*
* The object returned from this method is not a normal [[Flow]],
* it is a [[SubFlow]]. This means that after this operator all transformations
* are applied to all encountered substreams in the same fashion. Substream mode
* is exited either by closing the substream (i.e. connecting it to a [[Sink]])
* or by merging the substreams back together; see the `to` and `mergeBack` methods
* on [[SubFlow]] for more information.
*
* It is important to note that the substreams also propagate back-pressure as
* any other stream, which means that blocking one substream will block the `splitAfter`
* operator itself—and thereby all substreams—once all internal or
* explicit buffers are filled.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[pekko.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element passes through. When the provided predicate is true it emits the element
* and opens a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous
* is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream
* cancels or any substream cancels on `SubstreamCancelStrategy.propagate`
*
* See also [[Flow.splitWhen]].
*/
def splitAfter(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitAfter(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true.
*
* @see [[#splitAfter]]
*/
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test))
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,
* fully consuming one Source after the other.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*/
def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Flow[In, T, Mat] =
new Flow(delegate.flatMapConcat[T, M](x => f(x)))
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by merging, where at most `breadth`
* substreams are being consumed at any given time.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*/
def flatMapMerge[T, M](breadth: Int, f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Flow[In, T, Mat] =
new Flow(delegate.flatMapMerge(breadth, o => f(o)))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazy]]
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concat[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.concat(that))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.concatLazy(that))
/**
* Concatenate the given [[Source]]s to this [[Flow]], meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
* Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]]s will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given all those [[Source]]s completes
*
* '''Cancels when''' downstream cancels
*/
@varargs
@SafeVarargs
def concatAllLazy(those: Graph[SourceShape[Out], _]*): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.concatAllLazy(those: _*))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#concat]]
*/
def concatMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed
* the operator can be combined with `Source.lazy` to defer materialization of `that`.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concatMat]]
*
* @see [[#concatLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def concatLazyMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
* by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prependLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#prepend]]
*/
def prependMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.prependMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow.
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is detached use [[#prependMat]]
*
* @see [[#prependLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def prependLazyMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.prependLazyMat(that)(combinerToScala(matF)))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the operator is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[M](secondary: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.orElse(secondary))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#orElse]]
*/
def orElseMat[M2, M3](
secondary: Graph[SourceShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.orElseMat(secondary)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sink and the downstream.
*
* '''Backpressures when''' downstream or Sink backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or Sink cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes
* through will also be sent to all those [[Sink]]s.
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sinks and the downstream.
*
* '''Backpressures when''' downstream or any of the [[Sink]]s backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or any of the [[Sink]]s cancels
*/
@varargs
@SafeVarargs
def alsoToAll(those: Graph[SinkShape[Out], _]*): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.alsoToAll(those: _*))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* It is similar to [[#wireTapMat]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#alsoTo]]
*/
def alsoToMat[M2, M3](
that: Graph[SinkShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.alsoToMat(that)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' any of the downstreams cancel
*/
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.divertTo(that, when.test))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* @see [[#divertTo]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def divertToMat[M2, M3](
that: Graph[SinkShape[Out], M2],
when: function.Predicate[Out],
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.divertToMat(that, when.test)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* It is similar to [[#alsoTo]] which does backpressure instead of dropping elements.
*
* '''Emits when''' element is available and demand exists from the downstream; the element will
* also be sent to the wire-tap Sink if there is demand.
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.wireTap(that))
/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* It is similar to [[#alsoToMat]] which does backpressure instead of dropping elements.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#wireTap]]
*/
def wireTapMat[M2, M3](
that: Graph[SinkShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.wireTapMat(that)(combinerToScala(matF)))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* Example:
* {{{
* Source<Integer, ?> src = Source.from(Arrays.asList(1, 2, 3))
* Flow<Integer, Integer, ?> flow = flow.interleave(Source.from(Arrays.asList(4, 5, 6, 7)), 2)
* src.via(flow) // 1, 2, 4, 5, 3, 6, 7
* }}}
*
* After one of upstreams is complete than all the rest elements will be emitted from the second one
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' the [[Flow]] and given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): javadsl.Flow[In, Out, Mat] =
interleave(that, segmentSize, eagerClose = false)
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
* through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
* other upstream and complete itself.
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' the [[Flow]] and given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int, eagerClose: Boolean): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.interleave(that, segmentSize, eagerClose))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* After one of upstreams is complete than all the rest elements will be emitted from the second one
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]]
*/
def interleaveMat[M, M2](
that: Graph[SourceShape[Out], M],
segmentSize: Int,
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
interleaveMat(that, segmentSize, eagerClose = false, matF)
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
* through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
* other upstream and complete itself.
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]]
*/
def interleaveMat[M, M2](
that: Graph[SourceShape[Out], M],
segmentSize: Int,
eagerClose: Boolean,
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] =
new Flow(delegate.interleaveMat(that, segmentSize, eagerClose)(combinerToScala(matF)))
/**
* Interleave is a deterministic merge of the given [[Source]]s with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
* through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
* other upstream and complete itself.
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' the [[Flow]] and given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
val seq = if (those != null) Util.immutableSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
else immutable.Seq()
new Flow(delegate.interleaveAll(seq, segmentSize, eagerClose))
}