public override fun fuse()

in kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt [69:100]


    public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
        assert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to (0, DROP_OLDEST) by callers
        // note: previous upstream context (specified before) takes precedence
        val newContext = context + this.context
        val newCapacity: Int
        val newOverflow: BufferOverflow
        if (onBufferOverflow != BufferOverflow.SUSPEND) {
            // this additional buffer never suspends => overwrite preceding buffering configuration
            newCapacity = capacity
            newOverflow = onBufferOverflow
        } else {
            // combine capacities, keep previous overflow strategy
            newCapacity = when {
                this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
                capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
                this.capacity == Channel.BUFFERED -> capacity
                capacity == Channel.BUFFERED -> this.capacity
                else -> {
                    // sanity checks
                    assert { this.capacity >= 0 }
                    assert { capacity >= 0 }
                    // combine capacities clamping to UNLIMITED on overflow
                    val sum = this.capacity + capacity
                    if (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow
                }
            }
            newOverflow = this.onBufferOverflow
        }
        if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
            return this
        return create(newContext, newCapacity, newOverflow)
    }