in store/reader.go [266:365]
func (r *kustoSpanReader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
if err := validateQuery(query); err != nil {
return nil, err
}
if query.NumTraces == 0 {
query.NumTraces = defaultNumTraces
}
kustoStmt := kql.New("let TraceIDs = (").AddTable(r.tableName).AddLiteral(getTracesBaseQuery)
kustoParameters := kql.NewParameters()
if query.ServiceName != "" {
kustoStmt = kustoStmt.AddLiteral(` | where ProcessServiceName == ParamProcessServiceName`)
kustoParameters = kustoParameters.AddString("ParamProcessServiceName", query.ServiceName)
}
if query.OperationName != "" {
kustoStmt = kustoStmt.AddLiteral(` | where SpanName == ParamOperationName`)
kustoParameters = kustoParameters.AddString("ParamOperationName", query.OperationName)
}
if query.Tags != nil {
for k, v := range query.Tags {
tagFilter := fmt.Sprintf(" | where TraceAttributes['%s'] == '%s' or ResourceAttributes['%s'] == '%s'", k, v, k, v)
kustoStmt = kustoStmt.AddUnsafe(tagFilter)
}
}
kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`)
kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin)
kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`)
kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax)
if query.DurationMin != 0 {
kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMin`)
kustoParameters = kustoParameters.AddTimespan("ParamDurationMin", query.DurationMin)
}
if query.DurationMax != 0 {
kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMax`)
kustoParameters = kustoParameters.AddTimespan("ParamDurationMax", query.DurationMax)
}
kustoStmt = kustoStmt.AddLiteral(" | summarize by TraceID")
kustoStmt = kustoStmt.AddLiteral(` | sample ParamNumTraces`)
kustoParameters = kustoParameters.AddInt("ParamNumTraces", int32(query.NumTraces))
kustoStmt = kustoStmt.AddLiteral(`); `).AddTable(r.tableName).AddLiteral(getTracesBaseQuery)
kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`)
kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin)
kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`)
kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax)
kustoStmt = kustoStmt.AddLiteral(` | where TraceID in (TraceIDs) | project-rename Tags=TraceAttributes,Logs=Events,ProcessTags=ResourceAttributes|extend References=iff(isempty(ParentID),todynamic("[]"),pack_array(bag_pack("refType","CHILD_OF","traceID",TraceID,"spanID",ParentID)))`)
r.logger.Debug(kustoStmt.String())
clientRequestId := GetClientId()
iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoParameters))...)
if err != nil {
return nil, err
}
defer iter.Stop()
m := make(map[model.TraceID][]*model.Span)
err = iter.DoOnRowOrError(
func(row *table.Row, e *errors.Error) error {
if e != nil {
return e
}
rec := kustoSpan{}
if err := row.ToStruct(&rec); err != nil {
return err
}
var span *model.Span
span, err = transformKustoSpanToModelSpan(&rec, r.logger)
if err != nil {
return err
}
m[span.TraceID] = append(m[span.TraceID], span)
return nil
},
)
var traces []*model.Trace
for _, spanArray := range m {
trace := model.Trace{Spans: spanArray}
//r.logger.Debug("Trace ==> " + trace.String())
traces = append(traces, &trace)
}
return traces, err
}