in store/reader.go [182:263]
func (r *kustoSpanReader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
if err := validateQuery(query); err != nil {
return nil, err
}
type TraceID struct {
TraceID string `kusto:"TraceID"`
}
kustoStmt := kql.New("").AddTable(r.tableName).AddLiteral(getTraceIdBaseQuery)
kustoParameters := kql.NewParameters()
if query.ServiceName != "" {
kustoStmt = kustoStmt.AddLiteral(` | where ProcessServiceName == ParamProcessServiceName`)
kustoParameters = kustoParameters.AddString("ParamProcessServiceName", query.ServiceName)
}
if query.OperationName != "" {
kustoStmt = kustoStmt.AddLiteral(` | where SpanName == ParamOperationName`)
kustoParameters = kustoParameters.AddString("ParamOperationName", query.OperationName)
}
if query.Tags != nil {
for k, v := range query.Tags {
replacedTag := strings.ReplaceAll(k, ".", TagDotReplacementCharacter)
tagFilter := fmt.Sprintf(" | where TraceAttributes['%s'] == '%s' or ResourceAttributes['%s'] == '%s'", replacedTag, v, replacedTag, v)
kustoStmt = kustoStmt.AddUnsafe(tagFilter)
}
}
kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`)
kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin)
kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`)
kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax)
if query.DurationMin != 0 {
kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMin`)
kustoParameters = kustoParameters.AddTimespan("ParamDurationMin", query.DurationMin)
}
if query.DurationMax != 0 {
kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMax`)
kustoParameters = kustoParameters.AddTimespan("ParamDurationMax", query.DurationMax)
}
kustoStmt = kustoStmt.AddLiteral("| summarize by TraceID")
if query.NumTraces != 0 {
kustoStmt.AddLiteral(`| sample ParamNumTraces`)
kustoParameters = kustoParameters.AddInt("ParamNumTraces", int32(query.NumTraces))
}
clientRequestId := GetClientId()
iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoParameters))...)
if err != nil {
return nil, err
}
defer iter.Stop()
var traceIds []model.TraceID
err = iter.DoOnRowOrError(
func(row *table.Row, e *errors.Error) error {
if e != nil {
return e
}
rec := TraceID{}
if err := row.ToStruct(&rec); err != nil {
return err
}
traceID, err := model.TraceIDFromString(rec.TraceID)
traceIds = append(traceIds, traceID)
return err
},
)
if err != nil {
return nil, err
}
return traceIds, err
}