in stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala [4076:5147]
def groupBy[K](
maxSubstreams: Int,
f: function.Function[Out, K],
allowClosedSubstreamRecreation: Boolean): SubSource[Out, Mat] =
new SubSource(delegate.groupBy(maxSubstreams, f.apply, allowClosedSubstreamRecreation))
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
* using the given function. When a new key is encountered for the first time
* a new substream is opened and subsequently fed with all elements belonging to
* that key.
*
* The object returned from this method is not a normal [[Flow]],
* it is a [[SubSource]]. This means that after this operator all transformations
* are applied to all encountered substreams in the same fashion. Substream mode
* is exited either by closing the substream (i.e. connecting it to a [[Sink]])
* or by merging the substreams back together; see the `to` and `mergeBack` methods
* on [[SubSource]] for more information.
*
* It is important to note that the substreams also propagate back-pressure as
* any other stream, which means that blocking one substream will block the `groupBy`
* operator itself—and thereby all substreams—once all internal or
* explicit buffers are filled.
*
* If the group by function `f` throws an exception and the supervision decision
* is [[pekko.stream.Supervision#stop]] the stream and substreams will be completed
* with failure.
*
* If the group by function `f` throws an exception and the supervision decision
* is [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]]
* the element is dropped and the stream and substreams continue.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' an element for which the grouping function returns a group that has not yet been created.
* Emits the new group
*
* '''Backpressures when''' there is an element pending for a group whose substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and all substreams cancel
*
* @param maxSubstreams configures the maximum number of substreams (keys)
* that are supported; if more distinct keys are encountered then the stream fails
*/
def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubSource[Out @uncheckedVariance, Mat] =
new SubSource(delegate.groupBy(maxSubstreams, f.apply))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with
* the current element if the given predicate returns true for it. This means
* that for the following series of predicate values, three substreams will
* be produced with lengths 1, 2, and 3:
*
* {{{
* false, // element goes into first substream
* true, false, // elements go into second substream
* true, false, false // elements go into third substream
* }}}
*
* In case the *first* element of the stream matches the predicate, the first
* substream emitted by splitWhen will start from that element. For example:
*
* {{{
* true, false, false // first substream starts from the split-by element
* true, false // subsequent substreams operate the same way
* }}}
*
* The object returned from this method is not a normal [[Flow]],
* it is a [[SubSource]]. This means that after this operator all transformations
* are applied to all encountered substreams in the same fashion. Substream mode
* is exited either by closing the substream (i.e. connecting it to a [[Sink]])
* or by merging the substreams back together; see the `to` and `mergeBack` methods
* on [[SubSource]] for more information.
*
* It is important to note that the substreams also propagate back-pressure as
* any other stream, which means that blocking one substream will block the `splitWhen`
* operator itself—and thereby all substreams—once all internal or
* explicit buffers are filled.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[pekko.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element for which the provided predicate is true, opening and emitting a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*
* See also [[Source.splitAfter]].
*/
def splitWhen(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitWhen(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams, always beginning a new one with
* the current element if the given predicate returns true for it.
*
* @see [[#splitWhen]]
*/
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true. This means that for the following series of predicate values,
* three substreams will be produced with lengths 2, 2, and 3:
*
* {{{
* false, true, // elements go into first substream
* false, true, // elements go into second substream
* false, false, true // elements go into third substream
* }}}
*
* The object returned from this method is not a normal [[Flow]],
* it is a [[SubSource]]. This means that after this operator all transformations
* are applied to all encountered substreams in the same fashion. Substream mode
* is exited either by closing the substream (i.e. connecting it to a [[Sink]])
* or by merging the substreams back together; see the `to` and `mergeBack` methods
* on [[SubSource]] for more information.
*
* It is important to note that the substreams also propagate back-pressure as
* any other stream, which means that blocking one substream will block the `splitAfter`
* operator itself—and thereby all substreams—once all internal or
* explicit buffers are filled.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[pekko.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element passes through. When the provided predicate is true it emits the element
* and opens a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous
* is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*
* See also [[Source.splitWhen]].
*/
def splitAfter(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitAfter(p.test))
/**
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams. It *ends* the current substream when the
* predicate is true.
*
* @see [[#splitAfter]]
*/
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test))
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,
* fully consuming one Source after the other.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*/
def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat] =
new Source(delegate.flatMapConcat[T, M](x => f(x)))
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,
* fully consuming one Source after the other.
* `parallelism` can be used to config the max inflight sources, which will be materialized at the same time.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
* @since 1.2.0
*/
def flatMapConcat[T, M](parallelism: Int, f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat] =
new Source(delegate.flatMapConcat[T, M](parallelism, x => f(x)))
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by merging, where at most `breadth`
* substreams are being consumed at any given time.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*/
def flatMapMerge[T, M](breadth: Int, f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat] =
new Source(delegate.flatMapMerge(breadth, o => f(o)))
/**
* Transforms each input element into a `Source` of output elements that is
* then flattened into the output stream until a new input element is received
* at which point the current (now previous) substream is cancelled (which is why
* this operator is sometimes also called "flatMapLatest").
*
* '''Emits when''' the current substream has an element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes and the current substream completes
*
* '''Cancels when''' downstream cancels
*
* @since 1.2.0
*/
def switchMap[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat] =
new Source(delegate.switchMap(o => f(o)))
/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def initialTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialTimeout(timeout))
/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/
@nowarn("msg=deprecated")
def initialTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] =
initialTimeout(timeout.asScala)
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def completionTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.completionTimeout(timeout))
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/
@nowarn("msg=deprecated")
def completionTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] =
completionTimeout(timeout.asScala)
/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def idleTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.idleTimeout(timeout))
/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/
@nowarn("msg=deprecated")
def idleTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] =
idleTimeout(timeout.asScala)
/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def backpressureTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.backpressureTimeout(timeout))
/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
@nowarn("msg=deprecated")
def backpressureTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] =
backpressureTimeout(timeout.asScala)
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* operator attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] =
new Source(delegate.keepAlive(maxIdle, () => injectedElem.create()))
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* operator attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@nowarn("msg=deprecated")
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] =
keepAlive(maxIdle.asScala, injectedElem)
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this operator set the maximum rate
* for emitting messages. This operator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(elements: Int, per: java.time.Duration): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(elements, per.asScala))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this operator set the maximum rate
* for emitting messages. This operator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
* - [[pekko.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[pekko.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(elements, per, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this operator set the maximum rate
* for emitting messages. This operator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
* - [[pekko.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[pekko.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(
elements: Int,
per: java.time.Duration,
maximumBurst: Int,
mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(elements, per.asScala, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This operator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(
cost: Int,
per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per.asScala, costCalculation.apply _))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This operator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
* - [[pekko.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[pekko.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def throttle(
cost: Int,
per: FiniteDuration,
maximumBurst: Int,
costCalculation: function.Function[Out, Integer],
mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This operator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
* - [[pekko.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[pekko.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(
cost: Int,
per: java.time.Duration,
maximumBurst: Int,
costCalculation: function.Function[Out, Integer],
mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this operator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@deprecated("Use throttle without `maximumBurst` parameter instead.", "Akka 2.5.12")
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttleEven(elements, per, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this operator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@deprecated("Use throttle without `maximumBurst` parameter instead.", "Akka 2.5.12")
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Source[Out, Mat] =
throttleEven(elements, per.asScala, mode)
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this operator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@deprecated("Use throttle without `maximumBurst` parameter instead.", "Akka 2.5.12")
def throttleEven(
cost: Int,
per: FiniteDuration,
costCalculation: (Out) => Int,
mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttleEven(cost, per, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this operator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@deprecated("Use throttle without `maximumBurst` parameter instead.", "Akka 2.5.12")
def throttleEven(
cost: Int,
per: java.time.Duration,
costCalculation: (Out) => Int,
mode: ThrottleMode): javadsl.Source[Out, Mat] =
throttleEven(cost, per.asScala, costCalculation, mode)
/**
* Detaches upstream demand from downstream demand without detaching the
* stream rates; in other words acts like a buffer of size 1.
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def detach: javadsl.Source[Out, Mat] = new Source(delegate.detach)
/**
* Materializes to `Future[Done]` that completes on getting termination message.
* The Future completes with success when received complete message from upstream or cancel
* from downstream. It fails with the same error when received error message from
* downstream.
*/
def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
new Source(delegate.watchTermination()((left, right) => matF(left, right.asJava)))
/**
* Materializes to `FlowMonitor<Out>` that allows monitoring of the current flow. All events are propagated
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
* event, and may therefor affect performance.
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
@deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
new Source(delegate.monitorMat(combinerToScala(combine)))
/**
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
* event, and may therefor affect performance.
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
def monitorMat[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
new Source(delegate.monitorMat(combinerToScala(combine)))
/**
* Materializes to `Pair<Mat, FlowMonitor<<Out>>`, which is unlike most other operators (!),
* in which usually the default materialized value keeping semantics is to keep the left value
* (by passing `Keep.left()` to a `*Mat` version of a method). This operator is an exception from
* that rule and keeps both values since dropping its sole purpose is to introduce that materialized value.
*
* The `FlowMonitor` allows monitoring of the current flow. All events are propagated
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
* event, and may therefor affect performance.
*/
def monitor(): Source[Out, Pair[Mat, FlowMonitor[Out]]] =
monitorMat(Keep.both)
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay is already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "Akka 2.5.12")
def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialDelay(delay))
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay is already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
@nowarn("msg=deprecated")
def initialDelay(delay: java.time.Duration): javadsl.Source[Out, Mat] =
initialDelay(delay.asScala)
/**
* Replace the attributes of this [[Source]] with the given ones. If this Source is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*/
override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr))
/**
* Add the given attributes to this [[Source]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Source is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Source.
*/
override def named(name: String): javadsl.Source[Out, Mat] =
new Source(delegate.named(name))
/**
* Put an asynchronous boundary around this `Source`
*/
override def async: javadsl.Source[Out, Mat] =
new Source(delegate.async)
/**
* Put an asynchronous boundary around this `Source`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): javadsl.Source[Out, Mat] =
new Source(delegate.async(dispatcher))
/**
* Put an asynchronous boundary around this `Source`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): javadsl.Source[Out, Mat] =
new Source(delegate.async(dispatcher, inputBufferSize))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses the given [[LoggingAdapter]] for logging.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Source[Out, Mat] =
new Source(delegate.log(name, e => extract.apply(e))(log))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses an internally created [[LoggingAdapter]] which uses `org.apache.pekko.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any]): javadsl.Source[Out, Mat] =
this.log(name, extract, null)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* Uses the given [[LoggingAdapter]] for logging.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, log: LoggingAdapter): javadsl.Source[Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* Uses an internally created [[LoggingAdapter]] which uses `org.apache.pekko.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String): javadsl.Source[Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], null)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses the given [[MarkerLoggingAdapter]] for logging.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any],
log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
new Source(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses an internally created [[MarkerLoggingAdapter]] which uses `org.apache.pekko.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any]): javadsl.Source[Out, Mat] =
this.logWithMarker(name, marker, extract, null)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* Uses the given [[MarkerLoggingAdapter]] for logging.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow:
*
* Uses an internally created [[MarkerLoggingAdapter]] which uses `org.apache.pekko.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): javadsl.Source[Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null)
/**
* Transform this source whose element is ``e`` into a source producing tuple ``(e, f(e))``
*/
def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] =
new scaladsl.SourceWithContext(this.asScala.map(x => (x, extractContext.apply(x)))).asJava
/**
* Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
*
* '''Completes when''' upstream completes and the last aggregate has been emitted downstream
*
* '''Cancels when''' downstream cancels
*
* @param allocate allocate the initial data structure for aggregated elements
* @param aggregate update the aggregated elements, return true if ready to emit after update.
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] =
asScala
.aggregateWithBoundary(() => allocate.get())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
case Pair(predicate, duration) => (agg => predicate.test(agg), duration.asScala)
})