in cmd/tracing/daemon.go [275:339]
func (d *Daemon) poll() {
separator := []byte(protocolSeparator)
fallBackBuffer := make([]byte, receiveBufferSize)
splitBuf := make([][]byte, 2)
for {
bufPointer := d.pool.Get()
fallbackPointerUsed := false
if bufPointer == nil {
log.Debug("Pool does not have any buffer.")
bufPointer = &fallBackBuffer
fallbackPointerUsed = true
}
rlen := d.read(bufPointer)
if rlen > 0 {
telemetry.T.SegmentReceived(1)
}
if rlen == 0 {
if !fallbackPointerUsed {
d.pool.Return(bufPointer)
}
continue
}
if fallbackPointerUsed {
log.Warn("Segment dropped. Consider increasing memory limit")
telemetry.T.SegmentSpillover(1)
continue
} else if rlen == -1 {
return
}
buf := *bufPointer
bufMessage := buf[0:rlen]
slices := util.SplitHeaderBody(&bufMessage, &separator, &splitBuf)
if len(slices[1]) == 0 {
log.Warnf("Missing header or segment: %s", string(slices[0]))
d.pool.Return(bufPointer)
telemetry.T.SegmentRejected(1)
continue
}
header := slices[0]
payload := slices[1]
headerInfo := tracesegment.Header{}
json.Unmarshal(header, &headerInfo)
switch headerInfo.IsValid() {
case true:
default:
log.Warnf("Invalid header: %s", string(header))
d.pool.Return(bufPointer)
telemetry.T.SegmentRejected(1)
continue
}
ts := &tracesegment.TraceSegment{
Raw: &payload,
PoolBuf: bufPointer,
}
atomic.AddUint64(&d.count, 1)
d.std.Send(ts)
}
}