in pkg/processor/processor.go [66:103]
func New(awsConfig *aws.Config, s *session.Session, segmentBatchProcessorCount int, std *ringbuffer.RingBuffer,
pool *bufferpool.BufferPool, c *cfg.ParameterConfig) *Processor {
batchesChan := make(chan []*string, c.Processor.BatchProcessorQueueSize)
segmentBatchDoneChan := make(chan bool)
tsb := &segmentsBatch{
batches: batchesChan,
done: segmentBatchDoneChan,
randGen: rand.New(rand.NewSource(time.Now().UnixNano())),
timer: &timer.Client{},
}
x := conn.NewXRay(awsConfig, s)
if x == nil {
log.Error("X-Ray client returned nil")
os.Exit(1)
}
tsb.xRay = x
doneChan := make(chan bool)
log.Debugf("Batch size: %v", c.Processor.BatchSize)
p := &Processor{
Done: doneChan,
std: std,
pool: pool,
count: 0,
timerClient: &timer.Client{},
batchProcessorCount: segmentBatchProcessorCount,
traceSegmentsBatch: tsb,
batchSize: c.Processor.BatchSize,
sendIdleTimeout: time.Millisecond * time.Duration(c.Processor.IdleTimeoutMillisecond),
}
for i := 0; i < p.batchProcessorCount; i++ {
go p.traceSegmentsBatch.poll()
}
go p.poll()
return p
}