in kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt [209:273]
override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
// It is extremely complicated to support sending via `select` for broadcasts,
// as the operation should wait on multiple subscribers simultaneously.
// At the same time, broadcasts are obsolete, so we need a simple implementation
// that works somehow. Here is a tricky work-around. First, we launch a new
// coroutine that performs plain `send(..)` operation and tries to complete
// this `select` via `trySelect`, independently on whether it is in the
// registration or in the waiting phase. On success, the operation finishes.
// On failure, if another clause is already selected or the `select` operation
// has been cancelled, we observe non-linearizable behaviour, as this `onSend`
// clause is completed as well. However, we believe that such a non-linearizability
// is fine for obsolete API. The last case is when the `select` operation is still
// in the registration case, so this `onSend` clause should be re-registered.
// The idea is that we keep information that this `onSend` clause is already selected
// and finish immediately.
@Suppress("UNCHECKED_CAST")
element as E
// First, check whether this `onSend` clause is already
// selected, finishing immediately in this case.
lock.withLock {
val result = onSendInternalResult.remove(select)
if (result != null) { // already selected!
// `result` is either `Unit` ot `CHANNEL_CLOSED`.
select.selectInRegistrationPhase(result)
return
}
}
// Start a new coroutine that performs plain `send(..)`
// and tries to select this `onSend` clause at the end.
CoroutineScope(select.context).launch(start = CoroutineStart.UNDISPATCHED) {
val success: Boolean = try {
send(element)
// The element has been successfully sent!
true
} catch (t: Throwable) {
// This broadcast must be closed. However, it is possible that
// an unrelated exception, such as `OutOfMemoryError` has been thrown.
// This implementation checks that the channel is actually closed,
// re-throwing the caught exception otherwise.
if (isClosedForSend && (t is ClosedSendChannelException || sendException === t)) false
else throw t
}
// Mark this `onSend` clause as selected and
// try to complete the `select` operation.
lock.withLock {
// Status of this `onSend` clause should not be presented yet.
assert { onSendInternalResult[select] == null }
// Success or fail? Put the corresponding result.
onSendInternalResult[select] = if (success) Unit else CHANNEL_CLOSED
// Try to select this `onSend` clause.
select as SelectImplementation<*>
val trySelectResult = select.trySelectDetailed(this@BroadcastChannelImpl, Unit)
if (trySelectResult !== TrySelectDetailedResult.REREGISTER) {
// In case of re-registration (this `select` was still
// in the registration phase), the algorithm will invoke
// `registerSelectForSend`. As we stored an information that
// this `onSend` clause is already selected (in `onSendInternalResult`),
// the algorithm, will complete immediately. Otherwise, to avoid memory
// leaks, we must remove this information from the hashmap.
onSendInternalResult.remove(select)
}
}
}
}