func()

in pkg/processor/processor.go [105:140]


func (p *Processor) poll() {
	batch := make([]*tracesegment.TraceSegment, 0, p.batchSize)
	p.SetIdleTimer()

	for {
		select {
		case segment, ok := <-p.std.Channel:
			if ok {
				batch = p.receiveTraceSegment(segment, batch)
			} else {
				p.std.Empty = true
			}
		case <-p.idleTimer:
			if len(batch) > 0 {
				log.Debug("processor: sending partial batch")
				batch = p.sendBatchAsync(batch)
			} else {
				p.SetIdleTimer()
			}
		}

		if p.std.Empty {
			break
		}
	}

	if len(batch) > 0 {
		batch = p.sendBatchAsync(batch)
	}
	p.traceSegmentsBatch.close()
	for i := 0; i < p.batchProcessorCount; i++ {
		<-p.traceSegmentsBatch.done
	}
	log.Debug("processor: done!")
	p.Done <- true
}