func()

in store/reader.go [182:263]


func (r *kustoSpanReader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
	if err := validateQuery(query); err != nil {
		return nil, err
	}

	type TraceID struct {
		TraceID string `kusto:"TraceID"`
	}

	kustoStmt := kql.New("").AddTable(r.tableName).AddLiteral(getTraceIdBaseQuery)
	kustoParameters := kql.NewParameters()

	if query.ServiceName != "" {
		kustoStmt = kustoStmt.AddLiteral(` | where ProcessServiceName == ParamProcessServiceName`)
		kustoParameters = kustoParameters.AddString("ParamProcessServiceName", query.ServiceName)

	}

	if query.OperationName != "" {
		kustoStmt = kustoStmt.AddLiteral(` | where SpanName == ParamOperationName`)
		kustoParameters = kustoParameters.AddString("ParamOperationName", query.OperationName)
	}

	if query.Tags != nil {
		for k, v := range query.Tags {
			replacedTag := strings.ReplaceAll(k, ".", TagDotReplacementCharacter)
			tagFilter := fmt.Sprintf(" | where TraceAttributes['%s'] == '%s' or ResourceAttributes['%s'] == '%s'", replacedTag, v, replacedTag, v)
			kustoStmt = kustoStmt.AddUnsafe(tagFilter)
		}
	}

	kustoStmt = kustoStmt.AddLiteral(` | where StartTime > ParamStartTimeMin`)
	kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMin", query.StartTimeMin)

	kustoStmt = kustoStmt.AddLiteral(` | where StartTime < ParamStartTimeMax`)
	kustoParameters = kustoParameters.AddDateTime("ParamStartTimeMax", query.StartTimeMax)

	if query.DurationMin != 0 {
		kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMin`)
		kustoParameters = kustoParameters.AddTimespan("ParamDurationMin", query.DurationMin)
	}

	if query.DurationMax != 0 {
		kustoStmt = kustoStmt.AddLiteral(` | where Duration > ParamDurationMax`)
		kustoParameters = kustoParameters.AddTimespan("ParamDurationMax", query.DurationMax)
	}

	kustoStmt = kustoStmt.AddLiteral("| summarize by TraceID")

	if query.NumTraces != 0 {
		kustoStmt.AddLiteral(`| sample ParamNumTraces`)
		kustoParameters = kustoParameters.AddInt("ParamNumTraces", int32(query.NumTraces))
	}

	clientRequestId := GetClientId()
	iter, err := r.client.Query(ctx, r.database, kustoStmt, append(r.defaultReadOptions, kusto.ClientRequestID(clientRequestId), kusto.QueryParameters(kustoParameters))...)
	if err != nil {
		return nil, err
	}
	defer iter.Stop()

	var traceIds []model.TraceID
	err = iter.DoOnRowOrError(
		func(row *table.Row, e *errors.Error) error {
			if e != nil {
				return e
			}
			rec := TraceID{}
			if err := row.ToStruct(&rec); err != nil {
				return err
			}
			traceID, err := model.TraceIDFromString(rec.TraceID)
			traceIds = append(traceIds, traceID)
			return err
		},
	)
	if err != nil {
		return nil, err
	}

	return traceIds, err
}