in receiver/sqlserverreceiver/scraper.go [500:721]
func (s *sqlServerScraperHelper) recordDatabaseQueryTextAndPlan(ctx context.Context, topQueryCount uint) (plog.Logs, error) {
// Constants are the column names of the database status
const (
dbPrefix = "sqlserver."
executionCount = "execution_count"
logicalReads = "total_logical_reads"
logicalWrites = "total_logical_writes"
physicalReads = "total_physical_reads"
queryHash = "query_hash"
queryPlan = "query_plan"
queryPlanHash = "query_plan_hash"
queryText = "query_text"
rowsReturned = "total_rows"
// the time returned from mssql is in microsecond
totalElapsedTime = "total_elapsed_time"
totalGrant = "total_grant_kb"
// the time returned from mssql is in microsecond
totalWorkerTime = "total_worker_time"
)
rows, err := s.client.QueryRows(
ctx,
sql.Named("lookbackTime", -s.config.LookbackTime),
sql.Named("topNValue", s.config.TopQueryCount),
sql.Named("instanceName", s.config.InstanceName),
)
if err != nil {
if !errors.Is(err, sqlquery.ErrNullValueWarning) {
return plog.Logs{}, fmt.Errorf("sqlServerScraperHelper failed getting rows: %w", err)
}
s.logger.Warn("problems encountered getting log rows", zap.Error(err))
}
var errs []error
totalElapsedTimeDiffsMicrosecond := make([]int64, len(rows))
for i, row := range rows {
queryHashVal := hex.EncodeToString([]byte(row[queryHash]))
queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash]))
elapsedTimeMicrosecond, err := strconv.ParseInt(row[totalElapsedTime], 10, 64)
if err != nil {
s.logger.Info(fmt.Sprintf("sqlServerScraperHelper failed getting rows: %s", err))
errs = append(errs, err)
} else {
// we're trying to get the queries that used the most time.
// caching the total elapsed time (in microsecond) and compare in the next scrape.
if cached, diff := s.cacheAndDiff(queryHashVal, queryPlanHashVal, totalElapsedTime, elapsedTimeMicrosecond); cached && diff > 0 {
totalElapsedTimeDiffsMicrosecond[i] = diff
}
}
}
// sort the rows based on the totalElapsedTimeDiffs in descending order,
// only report first T(T=topQueryCount) rows.
rows = sortRows(rows, totalElapsedTimeDiffsMicrosecond, topQueryCount)
// sort the totalElapsedTimeDiffs in descending order as well
sort.Slice(totalElapsedTimeDiffsMicrosecond, func(i, j int) bool { return totalElapsedTimeDiffsMicrosecond[i] > totalElapsedTimeDiffsMicrosecond[j] })
logs := plog.NewLogs()
resourceLog := logs.ResourceLogs().AppendEmpty()
scopedLog := resourceLog.ScopeLogs().AppendEmpty()
scopedLog.Scope().SetName(metadata.ScopeName)
scopedLog.Scope().SetVersion("0.0.1")
timestamp := pcommon.NewTimestampFromTime(time.Now())
for i, row := range rows {
// skipping the rest of the rows as totalElapsedTimeDiffs is sorted in descending order
if totalElapsedTimeDiffsMicrosecond[i] == 0 {
break
}
// reporting human-readable query hash and query hash plan
queryHashVal := hex.EncodeToString([]byte(row[queryHash]))
queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash]))
record := scopedLog.LogRecords().AppendEmpty()
record.SetTimestamp(timestamp)
record.SetEventName("top query")
attributes := []internalAttribute{
{
key: computerNameKey,
columnName: computerNameKey,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: "db.query.text",
columnName: queryText,
valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) {
return obfuscateSQL(row[columnName])
},
valueSetter: setString,
},
{
key: dbPrefix + executionCount,
columnName: executionCount,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + logicalReads,
columnName: logicalReads,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + logicalWrites,
columnName: logicalWrites,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + physicalReads,
columnName: physicalReads,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + queryHash,
valueRetriever: defaultValueRetriever(queryHashVal),
valueSetter: setString,
},
{
key: dbPrefix + queryPlan,
columnName: queryPlan,
valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) {
return obfuscateXMLPlan(row[columnName])
},
valueSetter: setString,
},
{
key: dbPrefix + queryPlanHash,
valueRetriever: defaultValueRetriever(queryPlanHashVal),
valueSetter: setString,
},
{
key: dbPrefix + rowsReturned,
columnName: rowsReturned,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + totalElapsedTime,
valueRetriever: defaultValueRetriever(float64(totalElapsedTimeDiffsMicrosecond[i]) / 1_000_000),
valueSetter: setDouble,
},
{
key: dbPrefix + totalGrant,
columnName: totalGrant,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: "db.system.name",
valueRetriever: defaultValueRetriever("microsoft.sql_server"),
valueSetter: setString,
},
{
key: instanceNameKey,
columnName: instanceNameKey,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: serverAddressKey,
valueRetriever: defaultValueRetriever(s.config.Server),
valueSetter: setString,
},
{
key: serverPortKey,
valueRetriever: defaultValueRetriever((int64(s.config.Port))),
valueSetter: setInt,
},
}
updatedOnly := map[string]bool{
rowsReturned: true,
logicalReads: true,
logicalWrites: true,
physicalReads: true,
executionCount: true,
totalGrant: true,
}
s.logger.Debug(fmt.Sprintf("QueryHash: %v, PlanHash: %v, DataRow: %v", queryHashVal, queryPlanHashVal, row))
// handle `total_worker_time`, storing seconds
// it is a little bit tricky to put this to the array based workflow,
// as the value need to be divided -> type assertion -> check cache.
// hence handle it separately.
workerTimeMicrosecond, err := strconv.ParseInt(row[totalWorkerTime], 10, 64)
if err != nil {
err = fmt.Errorf("row %d: %w", i, err)
errs = append(errs, err)
} else {
if cached, diffMicrosecond := s.cacheAndDiff(queryHashVal, queryPlanHashVal, totalWorkerTime, workerTimeMicrosecond); cached {
record.Attributes().PutDouble(dbPrefix+totalWorkerTime, float64(diffMicrosecond)/1_000_000)
}
}
for _, attr := range attributes {
value, err := attr.valueRetriever(row, attr.columnName)
if err != nil {
s.logger.Error(fmt.Sprintf("sqlServerScraperHelper failed parsing %s. original value: %s, err: %s", attr.columnName, row[attr.columnName], err))
errs = append(errs, err)
}
if _, ok := updatedOnly[attr.columnName]; ok {
if cached, diff := s.cacheAndDiff(queryHashVal, queryPlanHashVal, attr.columnName, value.(int64)); cached {
attr.valueSetter(record.Attributes(), attr.key, diff)
}
} else {
attr.valueSetter(record.Attributes(), attr.key, value)
}
}
}
return logs, errors.Join(errs...)
}