in util/queue/queue.go [239:287]
func (q *Queue) Send(ctx context.Context, receivers chan<- string, frequency time.Duration) error {
var next func() (*string, time.Time)
if frequency == 0 {
next = func() (*string, time.Time) {
if len(q.queue) == 0 {
return nil, time.Time{}
}
it := heap.Pop(&q.queue).(*item)
return &it.name, it.when
}
} else {
next = func() (*string, time.Time) {
it := q.queue.peek()
if it == nil {
return nil, time.Time{}
}
when := it.when
it.when = time.Now().Add(frequency)
heap.Fix(&q.queue, it.index)
return &it.name, when
}
}
for {
if err := ctx.Err(); err != nil {
return err
}
q.lock.Lock()
who, when := next()
q.lock.Unlock()
if who == nil {
if frequency == 0 {
return nil
}
q.sleep(time.Second)
continue
}
if dur := time.Until(when); dur > 0 {
q.sleep(dur)
}
select {
case receivers <- *who:
case <-ctx.Done():
return ctx.Err()
}
}
}