func New()

in pkg/processor/processor.go [66:103]


func New(awsConfig *aws.Config, s *session.Session, segmentBatchProcessorCount int, std *ringbuffer.RingBuffer,
	pool *bufferpool.BufferPool, c *cfg.ParameterConfig) *Processor {
	batchesChan := make(chan []*string, c.Processor.BatchProcessorQueueSize)
	segmentBatchDoneChan := make(chan bool)
	tsb := &segmentsBatch{
		batches: batchesChan,
		done:    segmentBatchDoneChan,
		randGen: rand.New(rand.NewSource(time.Now().UnixNano())),
		timer:   &timer.Client{},
	}
	x := conn.NewXRay(awsConfig, s)
	if x == nil {
		log.Error("X-Ray client returned nil")
		os.Exit(1)
	}
	tsb.xRay = x
	doneChan := make(chan bool)
	log.Debugf("Batch size: %v", c.Processor.BatchSize)
	p := &Processor{
		Done:                doneChan,
		std:                 std,
		pool:                pool,
		count:               0,
		timerClient:         &timer.Client{},
		batchProcessorCount: segmentBatchProcessorCount,
		traceSegmentsBatch:  tsb,
		batchSize:           c.Processor.BatchSize,
		sendIdleTimeout:     time.Millisecond * time.Duration(c.Processor.IdleTimeoutMillisecond),
	}

	for i := 0; i < p.batchProcessorCount; i++ {
		go p.traceSegmentsBatch.poll()
	}

	go p.poll()

	return p
}