in stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala [1058:1752]
def via[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Source[T, Mat] =
new Source(delegate.via(flow))
/**
* Transform this [[Source]] by appending the given processing operators.
* {{{
* +----------------------------+
* | Resulting Source |
* | |
* | +------+ +------+ |
* | | | | | |
* | | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def viaMat[T, M, M2](
flow: Graph[FlowShape[Out, T], M],
combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.viaMat(flow)(combinerToScala(combine)))
/**
* Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
* {{{
* +----------------------------+
* | Resulting RunnableGraph |
* | |
* | +------+ +------+ |
* | | | | | |
* | | this | ~Out~> | sink | |
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The materialized value of the combined [[Sink]] will be the materialized
* value of the current flow (ignoring the given Sink’s value), use
* `toMat` if a different strategy is needed.
*/
def to[M](sink: Graph[SinkShape[Out], M]): javadsl.RunnableGraph[Mat] =
RunnableGraph.fromGraph(delegate.to(sink))
/**
* Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
* {{{
* +----------------------------+
* | Resulting RunnableGraph |
* | |
* | +------+ +------+ |
* | | | | | |
* | | this | ~Out~> | sink | |
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] =
RunnableGraph.fromGraph(delegate.toMat(sink)(combinerToScala(combine)))
/**
* Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded.
*
* Note that the `ActorSystem` can be used as the `materializer` parameter to use the
* [[pekko.stream.SystemMaterializer]] for running the stream.
*/
def run(materializer: Materializer): CompletionStage[Done] =
delegate.run()(materializer).asJava
/**
* Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded.
*
* Note that the `ActorSystem` can be used as the `systemProvider` parameter to use the
* [[pekko.stream.SystemMaterializer]] for running the stream.
*/
def run(systemProvider: ClassicActorSystemProvider): CompletionStage[Done] =
delegate.run()(SystemMaterializer(systemProvider.classicSystem).materializer).asJava
/**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a `Sink.asPublisher`.
*
* Note that the classic or typed `ActorSystem` can be used as the `systemProvider` parameter.
*/
def runWith[M](sink: Graph[SinkShape[Out], M], systemProvider: ClassicActorSystemProvider): M =
delegate.runWith(sink)(SystemMaterializer(systemProvider.classicSystem).materializer)
/**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a `Sink.asPublisher`.
*
* Prefer the method taking an `ActorSystem` unless you have special requirements
*/
def runWith[M](sink: Graph[SinkShape[Out], M], materializer: Materializer): M =
delegate.runWith(sink)(materializer)
/**
* Shortcut for running this `Source` with a fold function.
* The given function is invoked for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
* The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure is signaled in the stream.
*
* Note that the classic or typed `ActorSystem` can be used as the `systemProvider` parameter.
*/
def runFold[U](
zero: U,
f: function.Function2[U, Out, U],
systemProvider: ClassicActorSystemProvider): CompletionStage[U] =
runWith(Sink.fold(zero, f), systemProvider)
/**
* Shortcut for running this `Source` with a fold function.
* The given function is invoked for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
* The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure is signaled in the stream.
*
* Prefer the method taking an ActorSystem unless you have special requirements.
*/
def runFold[U](zero: U, f: function.Function2[U, Out, U], materializer: Materializer): CompletionStage[U] =
runWith(Sink.fold(zero, f), materializer)
/**
* Shortcut for running this `Source` with an asynchronous fold function.
* The given function is invoked for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
* The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure is signaled in the stream.
*
* Note that the classic or typed `ActorSystem` can be used as the `systemProvider` parameter.
*/
def runFoldAsync[U](
zero: U,
f: function.Function2[U, Out, CompletionStage[U]],
systemProvider: ClassicActorSystemProvider): CompletionStage[U] = runWith(Sink.foldAsync(zero, f), systemProvider)
/**
* Shortcut for running this `Source` with an asynchronous fold function.
* The given function is invoked for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
* The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure is signaled in the stream.
*
* Prefer the method taking an `ActorSystem` unless you have special requirements
*/
def runFoldAsync[U](
zero: U,
f: function.Function2[U, Out, CompletionStage[U]],
materializer: Materializer): CompletionStage[U] = runWith(Sink.foldAsync(zero, f), materializer)
/**
* Shortcut for running this `Source` with a reduce function.
* The given function is invoked for every received element, giving it its previous
* output (from the second ones) an the element as input.
* The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure is signaled in the stream.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce operator will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*
* Note that the classic or typed `ActorSystem` can be used as the `systemProvider` parameter.
*/
def runReduce(
f: function.Function2[Out, Out, Out],
systemProvider: ClassicActorSystemProvider): CompletionStage[Out] =
runWith(Sink.reduce(f), systemProvider.classicSystem)
/**
* Shortcut for running this `Source` with a reduce function.
* The given function is invoked for every received element, giving it its previous
* output (from the second ones) an the element as input.
* The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure is signaled in the stream.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce operator will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*
* Prefer the method taking an `ActorSystem` unless you have special requirements
*/
def runReduce(f: function.Function2[Out, Out, Out], materializer: Materializer): CompletionStage[Out] =
runWith(Sink.reduce(f), materializer)
/**
* Concatenate this [[Source]] with the given one, meaning that once current
* is exhausted and all result elements have been generated,
* the given source elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazy]]
*
* '''Emits when''' element is available from current source or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concat[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.concat(that))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def concatLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.concatLazy(that))
/**
* Concatenate the given [[Source]]s to this one, meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
* Note that the [[Source]]s are materialized together with this Flow. If `lazy` materialization is what is needed
* the operator can be combined with for example `Source.lazySource` to defer materialization of `that` until the
* time when this source completes.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concat]]
*
* If this [[Source]] gets upstream error - no elements from the given [[Source]]s will be pulled.
*
* '''Emits when''' element is available from current stream or from the given [[Source]]s when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all the given [[Source]]s completes
*
* '''Cancels when''' downstream cancels
*/
@varargs
@SafeVarargs
def concatAllLazy(those: Graph[SourceShape[Out], _]*): javadsl.Source[Out, Mat] =
new Source(delegate.concatAllLazy(those: _*))
/**
* Concatenate this [[Source]] with the given one, meaning that once current
* is exhausted and all result elements have been generated,
* the given source elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning it will
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a concat operator that is not detached use [[#concatLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#concat]].
*/
def concatMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flow’s input is exhausted and all result elements have been generated,
* the Source’s elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow, if `lazy` materialization is what is needed
* the operator can be combined with `Source.lazy` to defer materialization of `that`.
*
* The second source is then kept from producing elements by asserting back-pressure until its time comes.
*
* For a concat operator that is detached, use [[#concatMat]]
*
* @see [[#concatLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def concatLazyMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.concatLazyMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this one, meaning that once the given source
* is exhausted and all result elements have been generated, the current source's
* elements will be produced.
*
* Note that the [[Source]] is materialized together with this Flow and is "detached" meaning
* in effect behave as a one element buffer in front of both the sources, that eagerly demands an element on start
* (so it can not be combined with `Source.lazy` to defer materialization of `that`).
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazy]]
*
* '''Emits when''' element is available from current source or from the given [[Source]] when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.prepend(that))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow and will then be kept from producing elements
* by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is also detached use [[#prepend]]
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* '''Emits when''' element is available from the given [[Source]] or from current stream when the [[Source]] is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' this [[Flow]] completes
*
* '''Cancels when''' downstream cancels
*/
def prependLazy[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.prependLazy(that))
/**
* Prepend the given [[Source]] to this one, meaning that once the given source
* is exhausted and all result elements have been generated, the current source's
* elements will be produced.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is not detached use [[#prependLazyMat]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#prepend]].
*/
def prependMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.prependMat(that)(combinerToScala(matF)))
/**
* Prepend the given [[Source]] to this [[Flow]], meaning that before elements
* are generated from this Flow, the Source's elements will be produced until it
* is exhausted, at which point Flow elements will start being produced.
*
* Note that the [[Source]] is materialized together with this Flow.
*
* This flow will then be kept from producing elements by asserting back-pressure until its time comes.
*
* When needing a prepend operator that is detached use [[#prependMat]]
*
* @see [[#prependLazy]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def prependLazyMat[M, M2](
that: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.prependLazyMat(that)(combinerToScala(matF)))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* Note that this Flow will be materialized together with the [[Source]] and just kept
* from producing elements by asserting back-pressure until its time comes or it gets
* cancelled.
*
* On errors the operator is failed regardless of source of the error.
*
* '''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element
* is available from the second stream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes
* without emitting and the secondary stream already has completed or when the secondary stream completes
*
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[M](secondary: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] =
new Source(delegate.orElse(secondary))
/**
* Provides a secondary source that will be consumed if this source completes without any
* elements passing by. As soon as the first element comes through this stream, the alternative
* will be cancelled.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#orElse]]
*/
def orElseMat[M, M2](
secondary: Graph[SourceShape[Out], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.orElseMat(secondary)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Source]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sink and the downstream.
*
* '''Backpressures when''' downstream or Sink backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or Sink cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
new Source(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes
* through will also be sent to all those [[Sink]]s.
*
* It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready.
*
* '''Emits when''' element is available and demand exists both from the Sinks and the downstream.
*
* '''Backpressures when''' downstream or any of the [[Sink]]s backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream or any of the [[Sink]]s cancels
*/
@varargs
@SafeVarargs
def alsoToAll(those: Graph[SinkShape[Out], _]*): javadsl.Source[Out, Mat] =
new Source(delegate.alsoToAll(those: _*))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* It is similar to [[#wireTapMat]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#alsoTo]]
*/
def alsoToMat[M2, M3](
that: Graph[SinkShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.alsoToMat(that)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' any of the downstreams cancel
*/
def divertTo(that: Graph[SinkShape[Out], _], when: function.Predicate[Out]): javadsl.Source[Out, Mat] =
new Source(delegate.divertTo(that, when.test))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]]
* instead of being passed through if the predicate `when` returns `true`.
*
* @see [[#divertTo]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def divertToMat[M2, M3](
that: Graph[SinkShape[Out], M2],
when: function.Predicate[Out],
matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.divertToMat(that, when.test)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* It is similar to [[#alsoTo]] which does backpressure instead of dropping elements.
*
* '''Emits when''' element is available and demand exists from the downstream; the element will
* also be sent to the wire-tap Sink if there is demand.
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
new Source(delegate.wireTap(that))
/**
* Attaches the given [[Sink]] to this [[Flow]] as a wire tap, meaning that elements that pass
* through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow.
* If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead.
*
* It is similar to [[#alsoToMat]] which does backpressure instead of dropping elements.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#wireTap]]
*/
def wireTapMat[M2, M3](
that: Graph[SinkShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.wireTapMat(that)(combinerToScala(matF)))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* Example:
* {{{
* Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2)
* // 1, 2, 4, 5, 3, 6, 7
* }}}
*
* After one of sources is complete than all the rest elements will be emitted from the second one
*
* If one of sources gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' this [[Source]] and given one completes
*
* '''Cancels when''' downstream cancels
*/
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): javadsl.Source[Out, Mat] =
new Source(delegate.interleave(that, segmentSize))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
* through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
* other upstream and complete itself.
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' the [[Flow]] and given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int, eagerClose: Boolean): javadsl.Source[Out, Mat] =
new Source(delegate.interleave(that, segmentSize, eagerClose))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* After one of sources is complete than all the rest elements will be emitted from the second one
*
* If one of sources gets upstream error - stream completes with failure.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]].
*/
def interleaveMat[M, M2](
that: Graph[SourceShape[Out], M],
segmentSize: Int,
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF)))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
* through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
* other upstream and complete itself.
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]]
*/
def interleaveMat[M, M2](
that: Graph[SourceShape[Out], M],
segmentSize: Int,
eagerClose: Boolean,
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] =
new Source(delegate.interleaveMat(that, segmentSize, eagerClose)(combinerToScala(matF)))
/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
* It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source,
* then repeat process.
*
* If eagerClose is false and one of the upstreams complete the elements from the other upstream will continue passing
* through the interleave operator. If eagerClose is true and one of the upstream complete interleave will cancel the
* other upstream and complete itself.
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* '''Emits when''' element is available from the currently consumed upstream
*
* '''Backpressures when''' downstream backpressures. Signal to current
* upstream, switch to next upstream when received `segmentSize` elements
*
* '''Completes when''' the [[Flow]] and given [[Source]] completes
*
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
else immutable.Seq()
new Source(delegate.interleaveAll(seq, segmentSize, eagerClose))
}