in pulsar/ack_grouping_tracker.go [43:85]
func newAckGroupingTracker(options *AckGroupingOptions,
ackIndividual func(id MessageID),
ackCumulative func(id MessageID),
ackList func(ids []*pb.MessageIdData)) ackGroupingTracker {
if options == nil {
options = &AckGroupingOptions{
MaxSize: 1000,
MaxTime: 100 * time.Millisecond,
}
}
if options.MaxSize <= 1 {
return &immediateAckGroupingTracker{
ackIndividual: ackIndividual,
ackCumulative: ackCumulative,
}
}
t := &timedAckGroupingTracker{
maxNumAcks: int(options.MaxSize),
ackCumulative: ackCumulative,
ackList: ackList,
pendingAcks: make(map[[2]uint64]*bitset.BitSet),
lastCumulativeAck: EarliestMessageID(),
}
if options.MaxTime > 0 {
t.ticker = time.NewTicker(options.MaxTime)
t.exitCh = make(chan struct{})
go func() {
for {
select {
case <-t.exitCh:
return
case <-t.ticker.C:
t.flush()
}
}
}()
}
return t
}