func()

in pkg/processor/batchprocessor.go [63:119]


func (s *segmentsBatch) poll() {
	for {
		batch, ok := <-s.batches
		if ok {
			params := &xray.PutTraceSegmentsInput{
				TraceSegmentDocuments: batch,
			}
			start := time.Now()
			// send segment to X-Ray service.
			r, err := s.xRay.PutTraceSegments(params)
			if err != nil {
				telemetry.EvaluateConnectionError(err)
				log.Errorf("Sending segment batch failed with: %v", err)
				continue
			} else {
				telemetry.T.SegmentSent(int64(len(batch)))
			}
			elapsed := time.Since(start)

			if len(r.UnprocessedTraceSegments) != 0 {
				log.Infof("Sent batch of %d segments but had %d Unprocessed segments (%1.3f seconds)", len(batch),
					len(r.UnprocessedTraceSegments), elapsed.Seconds())
				batchesMap := make(map[string]string)
				for i := 0; i < len(batch); i++ {
					segIdStrs := segIdRegexp.FindStringSubmatch(*batch[i])
					if len(segIdStrs) != 2 {
						log.Debugf("Failed to match \"id\" in segment: %v", *batch[i])
						continue
					}
					batchesMap[segIdStrs[1]] = *batch[i]
				}
				for _, unprocessedSegment := range r.UnprocessedTraceSegments {
					telemetry.T.SegmentRejected(1)
					// Print all segments since don't know which exact one is invalid.
					if unprocessedSegment.Id == nil {
						log.Debugf("Received nil unprocessed segment id from X-Ray service: %v", unprocessedSegment)
						log.Debugf("Content in this batch: %v", params)
						break
					}
					traceIdStrs := traceIdRegexp.FindStringSubmatch(batchesMap[*unprocessedSegment.Id])
					if len(traceIdStrs) != 2 {
						log.Errorf("Unprocessed segment: %v", unprocessedSegment)
					} else {
						log.Errorf("Unprocessed trace %v, segment: %v", traceIdStrs[1], unprocessedSegment)
					}
					log.Debugf(batchesMap[*unprocessedSegment.Id])
				}
			} else {
				log.Infof("Successfully sent batch of %d segments (%1.3f seconds)", len(batch), elapsed.Seconds())
			}
		} else {
			log.Trace("Segment batch: done!")
			s.done <- true
			break
		}
	}
}