in pkg/processor/batchprocessor.go [63:119]
func (s *segmentsBatch) poll() {
for {
batch, ok := <-s.batches
if ok {
params := &xray.PutTraceSegmentsInput{
TraceSegmentDocuments: batch,
}
start := time.Now()
// send segment to X-Ray service.
r, err := s.xRay.PutTraceSegments(params)
if err != nil {
telemetry.EvaluateConnectionError(err)
log.Errorf("Sending segment batch failed with: %v", err)
continue
} else {
telemetry.T.SegmentSent(int64(len(batch)))
}
elapsed := time.Since(start)
if len(r.UnprocessedTraceSegments) != 0 {
log.Infof("Sent batch of %d segments but had %d Unprocessed segments (%1.3f seconds)", len(batch),
len(r.UnprocessedTraceSegments), elapsed.Seconds())
batchesMap := make(map[string]string)
for i := 0; i < len(batch); i++ {
segIdStrs := segIdRegexp.FindStringSubmatch(*batch[i])
if len(segIdStrs) != 2 {
log.Debugf("Failed to match \"id\" in segment: %v", *batch[i])
continue
}
batchesMap[segIdStrs[1]] = *batch[i]
}
for _, unprocessedSegment := range r.UnprocessedTraceSegments {
telemetry.T.SegmentRejected(1)
// Print all segments since don't know which exact one is invalid.
if unprocessedSegment.Id == nil {
log.Debugf("Received nil unprocessed segment id from X-Ray service: %v", unprocessedSegment)
log.Debugf("Content in this batch: %v", params)
break
}
traceIdStrs := traceIdRegexp.FindStringSubmatch(batchesMap[*unprocessedSegment.Id])
if len(traceIdStrs) != 2 {
log.Errorf("Unprocessed segment: %v", unprocessedSegment)
} else {
log.Errorf("Unprocessed trace %v, segment: %v", traceIdStrs[1], unprocessedSegment)
}
log.Debugf(batchesMap[*unprocessedSegment.Id])
}
} else {
log.Infof("Successfully sent batch of %d segments (%1.3f seconds)", len(batch), elapsed.Seconds())
}
} else {
log.Trace("Segment batch: done!")
s.done <- true
break
}
}
}