in kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt [69:100]
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
assert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to (0, DROP_OLDEST) by callers
// note: previous upstream context (specified before) takes precedence
val newContext = context + this.context
val newCapacity: Int
val newOverflow: BufferOverflow
if (onBufferOverflow != BufferOverflow.SUSPEND) {
// this additional buffer never suspends => overwrite preceding buffering configuration
newCapacity = capacity
newOverflow = onBufferOverflow
} else {
// combine capacities, keep previous overflow strategy
newCapacity = when {
this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
this.capacity == Channel.BUFFERED -> capacity
capacity == Channel.BUFFERED -> this.capacity
else -> {
// sanity checks
assert { this.capacity >= 0 }
assert { capacity >= 0 }
// combine capacities clamping to UNLIMITED on overflow
val sum = this.capacity + capacity
if (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow
}
}
newOverflow = this.onBufferOverflow
}
if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
return this
return create(newContext, newCapacity, newOverflow)
}