func newAckGroupingTracker()

in pulsar/ack_grouping_tracker.go [43:85]


func newAckGroupingTracker(options *AckGroupingOptions,
	ackIndividual func(id MessageID),
	ackCumulative func(id MessageID),
	ackList func(ids []*pb.MessageIdData)) ackGroupingTracker {
	if options == nil {
		options = &AckGroupingOptions{
			MaxSize: 1000,
			MaxTime: 100 * time.Millisecond,
		}
	}

	if options.MaxSize <= 1 {
		return &immediateAckGroupingTracker{
			ackIndividual: ackIndividual,
			ackCumulative: ackCumulative,
		}
	}

	t := &timedAckGroupingTracker{
		maxNumAcks:        int(options.MaxSize),
		ackCumulative:     ackCumulative,
		ackList:           ackList,
		pendingAcks:       make(map[[2]uint64]*bitset.BitSet),
		lastCumulativeAck: EarliestMessageID(),
	}

	if options.MaxTime > 0 {
		t.ticker = time.NewTicker(options.MaxTime)
		t.exitCh = make(chan struct{})
		go func() {
			for {
				select {
				case <-t.exitCh:
					return
				case <-t.ticker.C:
					t.flush()
				}
			}
		}()
	}

	return t
}