pulsar/internal/channel_cond.go (45 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package internal import ( "context" "sync" "sync/atomic" "unsafe" ) type chCond struct { L sync.Locker // The pointer to the channel, the channel pointed to may change, // because we will use the channel's close mechanism to implement broadcast notifications. notifyChPtr unsafe.Pointer } func newCond(l sync.Locker) *chCond { c := &chCond{L: l} n := make(chan struct{}) c.notifyChPtr = unsafe.Pointer(&n) return c } // wait for broadcast calls. Similar to regular sync.Cond func (c *chCond) wait() { n := c.notifyChan() c.L.Unlock() <-n c.L.Lock() } // waitWithContext Same as wait() call, but the end condition can also be controlled through the context. func (c *chCond) waitWithContext(ctx context.Context) bool { n := c.notifyChan() c.L.Unlock() defer c.L.Lock() select { case <-n: return true case <-ctx.Done(): return false default: return true } } // broadcast wakes all goroutines waiting on c. // It is not required for the caller to hold c.L during the call. func (c *chCond) broadcast() { n := make(chan struct{}) ptrOld := atomic.SwapPointer(&c.notifyChPtr, unsafe.Pointer(&n)) // close old channels to trigger broadcast. close(*(*chan struct{})(ptrOld)) } func (c *chCond) notifyChan() <-chan struct{} { ptr := atomic.LoadPointer(&c.notifyChPtr) return *((*chan struct{})(ptr)) }