func()

in cmd/tracing/daemon.go [275:339]


func (d *Daemon) poll() {
	separator := []byte(protocolSeparator)
	fallBackBuffer := make([]byte, receiveBufferSize)
	splitBuf := make([][]byte, 2)

	for {
		bufPointer := d.pool.Get()
		fallbackPointerUsed := false
		if bufPointer == nil {
			log.Debug("Pool does not have any buffer.")
			bufPointer = &fallBackBuffer
			fallbackPointerUsed = true
		}
		rlen := d.read(bufPointer)
		if rlen > 0 {
			telemetry.T.SegmentReceived(1)
		}
		if rlen == 0 {
			if !fallbackPointerUsed {
				d.pool.Return(bufPointer)
			}
			continue
		}
		if fallbackPointerUsed {
			log.Warn("Segment dropped. Consider increasing memory limit")
			telemetry.T.SegmentSpillover(1)
			continue
		} else if rlen == -1 {
			return
		}

		buf := *bufPointer
		bufMessage := buf[0:rlen]

		slices := util.SplitHeaderBody(&bufMessage, &separator, &splitBuf)
		if len(slices[1]) == 0 {
			log.Warnf("Missing header or segment: %s", string(slices[0]))
			d.pool.Return(bufPointer)
			telemetry.T.SegmentRejected(1)
			continue
		}

		header := slices[0]
		payload := slices[1]
		headerInfo := tracesegment.Header{}
		json.Unmarshal(header, &headerInfo)

		switch headerInfo.IsValid() {
		case true:
		default:
			log.Warnf("Invalid header: %s", string(header))
			d.pool.Return(bufPointer)
			telemetry.T.SegmentRejected(1)
			continue
		}

		ts := &tracesegment.TraceSegment{
			Raw:     &payload,
			PoolBuf: bufPointer,
		}

		atomic.AddUint64(&d.count, 1)
		d.std.Send(ts)
	}
}