in internal/trace.go [340:388]
func (td *traceDispatcher) process(maxWaitTime int64) {
var count int
var batch []TraceContext
lastput := time.Now()
for {
select {
case ctx := <-td.input:
count++
lastput = time.Now()
batch = append(batch, ctx)
if count == batchSize {
count = 0
batchSend := batch
go primitive.WithRecover(func() {
td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)
}
case <-td.ticker.C:
delta := time.Since(lastput).Nanoseconds()
if delta > maxWaitTime {
count++
lastput = time.Now()
if len(batch) > 0 {
batchSend := batch
go primitive.WithRecover(func() {
td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)
}
}
case <-td.ctx.Done():
batchSend := batch
go primitive.WithRecover(func() {
td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)
now := time.Now().UnixNano() / int64(time.Millisecond)
end := now + 500
for now < end {
now = time.Now().UnixNano() / int64(time.Millisecond)
runtime.Gosched()
}
rlog.Info(fmt.Sprintf("------end trace send %v %v", td.input, td.batchCh), nil)
return
}
}
}