store/reader.go (328 lines of code) (raw):

package store import ( "context" "fmt" "strings" "time" "github.com/Azure/azure-kusto-go/kusto/data/value" "github.com/google/uuid" "github.com/hashicorp/go-hclog" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/data/errors" "github.com/Azure/azure-kusto-go/kusto/data/table" "github.com/Azure/azure-kusto-go/kusto/kql" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" ) type kustoSpanReader struct { client kustoReaderClient database string tableName string logger hclog.Logger defaultReadOptions []kusto.QueryOption } type kustoReaderClient interface { Query(ctx context.Context, db string, query kusto.Statement, options ...kusto.QueryOption) (*kusto.RowIterator, error) } var queryMap = map[string]string{} func newKustoSpanReader(factory *kustoFactory, logger hclog.Logger, defaultReadOptions []kusto.QueryOption) (*kustoSpanReader, error) { return &kustoSpanReader{ factory.Reader(), factory.Database, factory.Table, logger, defaultReadOptions, }, nil } const defaultNumTraces = 20 func GetClientId() string { // get a UUID and concatenante with the service name return fmt.Sprintf("azure-kusto-jaeger-%s", uuid.New().String()) } // GetTrace finds trace by TraceID func (r *kustoSpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { kustoStmt := kql.New("").AddTable(r.tableName).AddLiteral(getTraceQuery) kustoStmtParams := kql.NewParameters().AddString("ParamTraceID", traceID.String()) clientRequestId := GetClientId() // Append a client request id as well to the request iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoStmtParams))...) if err != nil { r.logger.Error("Failed running GetTrace query. TraceID: %s. ClientRequestId : %s", traceID.String(), clientRequestId) return nil, err } defer iter.Stop() var spans []*model.Span err = iter.DoOnRowOrError( func(row *table.Row, e *errors.Error) error { if e != nil { return e } rec := kustoSpan{} if err := row.ToStruct(&rec); err != nil { return err } var span *model.Span span, err = transformKustoSpanToModelSpan(&rec, r.logger) if err != nil { r.logger.Error(fmt.Sprintf("Error in transformKustoSpanToModelSpan. TraceId: %s SpanId: %s", rec.TraceID, rec.SpanID), err) return err } spans = append(spans, span) return nil }, ) trace := model.Trace{Spans: spans} return &trace, err } // GetServices finds all possible services that spanstore contains func (r *kustoSpanReader) GetServices(ctx context.Context) ([]string, error) { clientRequestId := GetClientId() kustoStmt := kql.New(queryResultsCacheAge).AddTable(r.tableName).AddLiteral(getServicesQuery) r.logger.Debug("GetServicesQuery : %s ", kustoStmt.String()) iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId))...) if err != nil { r.logger.Error("Failed running GetServices query. ClientRequestId : %s", clientRequestId) return nil, err } defer iter.Stop() type Service struct { ServiceName string `kusto:"ProcessServiceName"` } var services []string err = iter.DoOnRowOrError( func(row *table.Row, e *errors.Error) error { if e != nil { return e } service := Service{} if err := row.ToStruct(&service); err != nil { return err } services = append(services, service.ServiceName) return nil }, ) if err != nil { return nil, err } return services, err } // GetOperations finds all operations by provided Service and SpanKind func (r *kustoSpanReader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) { type Operation struct { OperationName string `kusto:"OperationName"` SpanKind string `kusto:"SpanKind"` } clientRequestId := GetClientId() var iter *kusto.RowIterator var err error if query.ServiceName == "" && query.SpanKind == "" { kustoStmt := kql.New(queryResultsCacheAge).AddTable(r.tableName).AddLiteral(getOpsWithNoParamsQuery) iter, err = r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId))...) } if query.ServiceName != "" && query.SpanKind == "" { kustoStmt := kql.New(queryResultsCacheAge).AddTable(r.tableName).AddLiteral(getOpsWithParamsQuery) kustoStmtParams := kql.NewParameters().AddString("ParamProcessServiceName", query.ServiceName) iter, err = r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoStmtParams))...) } if err != nil { r.logger.Error("Failed running GetOperations query. ClientRequestId : %s", clientRequestId) return nil, err } defer iter.Stop() operations := []spanstore.Operation{} err = iter.DoOnRowOrError( func(row *table.Row, e *errors.Error) error { if e != nil { return e } operation := Operation{} if err := row.ToStruct(&operation); err != nil { return err } operations = append(operations, spanstore.Operation{ Name: operation.OperationName, SpanKind: operation.SpanKind, }) return nil }, ) if err != nil { return nil, err } return operations, err } // FindTraceIDs finds TraceIDs by provided query func (r *kustoSpanReader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { if err := validateQuery(query); err != nil { return nil, err } type TraceID struct { TraceID string `kusto:"TraceID"` } kustoStmt := kql.New("").AddTable(r.tableName).AddLiteral(getTraceIdBaseQuery) kustoParameters := kql.NewParameters() if query.ServiceName != "" { kustoStmt = kustoStmt.AddLiteral(` | where ProcessServiceName == ParamProcessServiceName`) kustoParameters = kustoParameters.AddString("ParamProcessServiceName", query.ServiceName) } if query.OperationName != "" { kustoStmt = kustoStmt.AddLiteral(` | where SpanName == ParamOperationName`) kustoParameters = kustoParameters.AddString("ParamOperationName", query.OperationName) } if query.Tags != nil { for k, v := range query.Tags { replacedTag := strings.ReplaceAll(k, ".", TagDotReplacementCharacter) tagFilter := fmt.Sprintf(" | where TraceAttributes['%s'] == '%s' or ResourceAttributes['%s'] == '%s'", replacedTag, v, replacedTag, v) kustoStmt = kustoStmt.AddUnsafe(tagFilter) } } kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`) kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin) kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`) kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax) if query.DurationMin != 0 { kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMin`) kustoParameters = kustoParameters.AddTimespan("ParamDurationMin", query.DurationMin) } if query.DurationMax != 0 { kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMax`) kustoParameters = kustoParameters.AddTimespan("ParamDurationMax", query.DurationMax) } kustoStmt = kustoStmt.AddLiteral("| summarize by TraceID") if query.NumTraces != 0 { kustoStmt.AddLiteral(`| sample ParamNumTraces`) kustoParameters = kustoParameters.AddInt("ParamNumTraces", int32(query.NumTraces)) } clientRequestId := GetClientId() iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoParameters))...) if err != nil { return nil, err } defer iter.Stop() var traceIds []model.TraceID err = iter.DoOnRowOrError( func(row *table.Row, e *errors.Error) error { if e != nil { return e } rec := TraceID{} if err := row.ToStruct(&rec); err != nil { return err } traceID, err := model.TraceIDFromString(rec.TraceID) traceIds = append(traceIds, traceID) return err }, ) if err != nil { return nil, err } return traceIds, err } // FindTraces finds and returns full traces with spans func (r *kustoSpanReader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { if err := validateQuery(query); err != nil { return nil, err } if query.NumTraces == 0 { query.NumTraces = defaultNumTraces } kustoStmt := kql.New("let TraceIDs = (").AddTable(r.tableName).AddLiteral(getTracesBaseQuery) kustoParameters := kql.NewParameters() if query.ServiceName != "" { kustoStmt = kustoStmt.AddLiteral(` | where ProcessServiceName == ParamProcessServiceName`) kustoParameters = kustoParameters.AddString("ParamProcessServiceName", query.ServiceName) } if query.OperationName != "" { kustoStmt = kustoStmt.AddLiteral(` | where SpanName == ParamOperationName`) kustoParameters = kustoParameters.AddString("ParamOperationName", query.OperationName) } if query.Tags != nil { for k, v := range query.Tags { tagFilter := fmt.Sprintf(" | where TraceAttributes['%s'] == '%s' or ResourceAttributes['%s'] == '%s'", k, v, k, v) kustoStmt = kustoStmt.AddUnsafe(tagFilter) } } kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`) kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin) kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`) kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax) if query.DurationMin != 0 { kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMin`) kustoParameters = kustoParameters.AddTimespan("ParamDurationMin", query.DurationMin) } if query.DurationMax != 0 { kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMax`) kustoParameters = kustoParameters.AddTimespan("ParamDurationMax", query.DurationMax) } kustoStmt = kustoStmt.AddLiteral(" | summarize by TraceID") kustoStmt = kustoStmt.AddLiteral(` | sample ParamNumTraces`) kustoParameters = kustoParameters.AddInt("ParamNumTraces", int32(query.NumTraces)) kustoStmt = kustoStmt.AddLiteral(`); `).AddTable(r.tableName).AddLiteral(getTracesBaseQuery) kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`) kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin) kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`) kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax) kustoStmt = kustoStmt.AddLiteral(` | where TraceID in (TraceIDs) | project-rename Tags=TraceAttributes,Logs=Events,ProcessTags=ResourceAttributes|extend References=iff(isempty(ParentID),todynamic("[]"),pack_array(bag_pack("refType","CHILD_OF","traceID",TraceID,"spanID",ParentID)))`) r.logger.Debug(kustoStmt.String()) clientRequestId := GetClientId() iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoParameters))...) if err != nil { return nil, err } defer iter.Stop() m := make(map[model.TraceID][]*model.Span) err = iter.DoOnRowOrError( func(row *table.Row, e *errors.Error) error { if e != nil { return e } rec := kustoSpan{} if err := row.ToStruct(&rec); err != nil { return err } var span *model.Span span, err = transformKustoSpanToModelSpan(&rec, r.logger) if err != nil { return err } m[span.TraceID] = append(m[span.TraceID], span) return nil }, ) var traces []*model.Trace for _, spanArray := range m { trace := model.Trace{Spans: spanArray} //r.logger.Debug("Trace ==> " + trace.String()) traces = append(traces, &trace) } return traces, err } // GetDependencies returns DependencyLinks of services func (r *kustoSpanReader) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { type kustoDependencyLink struct { Parent string `kusto:"Parent"` Child string `kusto:"Child"` CallCount value.Long `kusto:"CallCount"` } kustoStmt := kql.New(queryResultsCacheAge).AddTable(r.tableName).AddLiteral(getDependenciesQuery).AddTable(r.tableName).AddLiteral(getDependenciesJoinQuery) kustoParams := kql.NewParameters().AddDateTime("ParamEndTs", endTs).AddTimespan("ParamLookBack", lookback) clientRequestId := GetClientId() iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoParams))...) if err != nil { return nil, err } defer iter.Stop() var dependencyLinks []model.DependencyLink err = iter.DoOnRowOrError( func(row *table.Row, e *errors.Error) error { if e != nil { return e } rec := kustoDependencyLink{} if err := row.ToStruct(&rec); err != nil { return err } dependencyLinks = append(dependencyLinks, model.DependencyLink{ Parent: rec.Parent, Child: rec.Child, CallCount: uint64(rec.CallCount.Value), }) return nil }, ) return dependencyLinks, err }