in pkg/processor/processor.go [105:140]
func (p *Processor) poll() {
batch := make([]*tracesegment.TraceSegment, 0, p.batchSize)
p.SetIdleTimer()
for {
select {
case segment, ok := <-p.std.Channel:
if ok {
batch = p.receiveTraceSegment(segment, batch)
} else {
p.std.Empty = true
}
case <-p.idleTimer:
if len(batch) > 0 {
log.Debug("processor: sending partial batch")
batch = p.sendBatchAsync(batch)
} else {
p.SetIdleTimer()
}
}
if p.std.Empty {
break
}
}
if len(batch) > 0 {
batch = p.sendBatchAsync(batch)
}
p.traceSegmentsBatch.close()
for i := 0; i < p.batchProcessorCount; i++ {
<-p.traceSegmentsBatch.done
}
log.Debug("processor: done!")
p.Done <- true
}