func()

in store/reader.go [266:365]


func (r *kustoSpanReader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
	if err := validateQuery(query); err != nil {
		return nil, err
	}

	if query.NumTraces == 0 {
		query.NumTraces = defaultNumTraces
	}

	kustoStmt := kql.New("let TraceIDs = (").AddTable(r.tableName).AddLiteral(getTracesBaseQuery)
	kustoParameters := kql.NewParameters()

	if query.ServiceName != "" {
		kustoStmt = kustoStmt.AddLiteral(` | where ProcessServiceName == ParamProcessServiceName`)
		kustoParameters = kustoParameters.AddString("ParamProcessServiceName", query.ServiceName)
	}

	if query.OperationName != "" {
		kustoStmt = kustoStmt.AddLiteral(` | where SpanName == ParamOperationName`)
		kustoParameters = kustoParameters.AddString("ParamOperationName", query.OperationName)
	}

	if query.Tags != nil {
		for k, v := range query.Tags {
			tagFilter := fmt.Sprintf(" | where TraceAttributes['%s'] == '%s' or ResourceAttributes['%s'] == '%s'", k, v, k, v)
			kustoStmt = kustoStmt.AddUnsafe(tagFilter)
		}
	}

	kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`)
	kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin)

	kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`)
	kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax)

	if query.DurationMin != 0 {
		kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMin`)
		kustoParameters = kustoParameters.AddTimespan("ParamDurationMin", query.DurationMin)
	}

	if query.DurationMax != 0 {
		kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMax`)
		kustoParameters = kustoParameters.AddTimespan("ParamDurationMax", query.DurationMax)
	}

	kustoStmt = kustoStmt.AddLiteral(" | summarize by TraceID")

	kustoStmt = kustoStmt.AddLiteral(` | sample ParamNumTraces`)
	kustoParameters = kustoParameters.AddInt("ParamNumTraces", int32(query.NumTraces))

	kustoStmt = kustoStmt.AddLiteral(`); `).AddTable(r.tableName).AddLiteral(getTracesBaseQuery)
	
	kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`)
	kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin)

	kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`)
	kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax)

	kustoStmt = kustoStmt.AddLiteral(` | where TraceID in (TraceIDs) | project-rename Tags=TraceAttributes,Logs=Events,ProcessTags=ResourceAttributes|extend References=iff(isempty(ParentID),todynamic("[]"),pack_array(bag_pack("refType","CHILD_OF","traceID",TraceID,"spanID",ParentID)))`)

	r.logger.Debug(kustoStmt.String())
	clientRequestId := GetClientId()
	iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoParameters))...)
	if err != nil {
		return nil, err
	}
	defer iter.Stop()

	m := make(map[model.TraceID][]*model.Span)

	err = iter.DoOnRowOrError(
		func(row *table.Row, e *errors.Error) error {
			if e != nil {
				return e
			}
			rec := kustoSpan{}
			if err := row.ToStruct(&rec); err != nil {
				return err
			}

			var span *model.Span
			span, err = transformKustoSpanToModelSpan(&rec, r.logger)

			if err != nil {
				return err
			}
			m[span.TraceID] = append(m[span.TraceID], span)
			return nil
		},
	)

	var traces []*model.Trace

	for _, spanArray := range m {
		trace := model.Trace{Spans: spanArray}
		//r.logger.Debug("Trace ==> " + trace.String())
		traces = append(traces, &trace)
	}
	return traces, err
}