override fun registerSelectForSend()

in kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt [209:273]


    override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
        // It is extremely complicated to support sending via `select` for broadcasts,
        // as the operation should wait on multiple subscribers simultaneously.
        // At the same time, broadcasts are obsolete, so we need a simple implementation
        // that works somehow. Here is a tricky work-around. First, we launch a new
        // coroutine that performs plain `send(..)` operation and tries to complete
        // this `select` via `trySelect`, independently on whether it is in the
        // registration or in the waiting phase. On success, the operation finishes.
        // On failure, if another clause is already selected or the `select` operation
        // has been cancelled, we observe non-linearizable behaviour, as this `onSend`
        // clause is completed as well. However, we believe that such a non-linearizability
        // is fine for obsolete API. The last case is when the `select` operation is still
        // in the registration case, so this `onSend` clause should be re-registered.
        // The idea is that we keep information that this `onSend` clause is already selected
        // and finish immediately.
        @Suppress("UNCHECKED_CAST")
        element as E
        // First, check whether this `onSend` clause is already
        // selected, finishing immediately in this case.
        lock.withLock {
            val result = onSendInternalResult.remove(select)
            if (result != null) { // already selected!
                // `result` is either `Unit` ot `CHANNEL_CLOSED`.
                select.selectInRegistrationPhase(result)
                return
            }
        }
        // Start a new coroutine that performs plain `send(..)`
        // and tries to select this `onSend` clause at the end.
        CoroutineScope(select.context).launch(start = CoroutineStart.UNDISPATCHED) {
            val success: Boolean = try {
                send(element)
                // The element has been successfully sent!
                true
            } catch (t: Throwable) {
                // This broadcast must be closed. However, it is possible that
                // an unrelated exception, such as `OutOfMemoryError` has been thrown.
                // This implementation checks that the channel is actually closed,
                // re-throwing the caught exception otherwise.
                if (isClosedForSend && (t is ClosedSendChannelException || sendException === t)) false
                else throw t
            }
            // Mark this `onSend` clause as selected and
            // try to complete the `select` operation.
            lock.withLock {
                // Status of this `onSend` clause should not be presented yet.
                assert { onSendInternalResult[select] == null }
                // Success or fail? Put the corresponding result.
                onSendInternalResult[select] = if (success) Unit else CHANNEL_CLOSED
                // Try to select this `onSend` clause.
                select as SelectImplementation<*>
                val trySelectResult = select.trySelectDetailed(this@BroadcastChannelImpl,  Unit)
                if (trySelectResult !== TrySelectDetailedResult.REREGISTER) {
                    // In case of re-registration (this `select` was still
                    // in the registration phase), the algorithm will invoke
                    // `registerSelectForSend`. As we stored an information that
                    // this `onSend` clause is already selected (in `onSendInternalResult`),
                    // the algorithm, will complete immediately. Otherwise, to avoid memory
                    // leaks, we must remove this information from the hashmap.
                    onSendInternalResult.remove(select)
                }
            }

        }
    }