func()

in receiver/sqlserverreceiver/scraper.go [500:721]


func (s *sqlServerScraperHelper) recordDatabaseQueryTextAndPlan(ctx context.Context, topQueryCount uint) (plog.Logs, error) {
	// Constants are the column names of the database status
	const (
		dbPrefix       = "sqlserver."
		executionCount = "execution_count"
		logicalReads   = "total_logical_reads"
		logicalWrites  = "total_logical_writes"
		physicalReads  = "total_physical_reads"
		queryHash      = "query_hash"
		queryPlan      = "query_plan"
		queryPlanHash  = "query_plan_hash"
		queryText      = "query_text"
		rowsReturned   = "total_rows"
		// the time returned from mssql is in microsecond
		totalElapsedTime = "total_elapsed_time"
		totalGrant       = "total_grant_kb"
		// the time returned from mssql is in microsecond
		totalWorkerTime = "total_worker_time"
	)

	rows, err := s.client.QueryRows(
		ctx,
		sql.Named("lookbackTime", -s.config.LookbackTime),
		sql.Named("topNValue", s.config.TopQueryCount),
		sql.Named("instanceName", s.config.InstanceName),
	)
	if err != nil {
		if !errors.Is(err, sqlquery.ErrNullValueWarning) {
			return plog.Logs{}, fmt.Errorf("sqlServerScraperHelper failed getting rows: %w", err)
		}
		s.logger.Warn("problems encountered getting log rows", zap.Error(err))
	}
	var errs []error

	totalElapsedTimeDiffsMicrosecond := make([]int64, len(rows))

	for i, row := range rows {
		queryHashVal := hex.EncodeToString([]byte(row[queryHash]))
		queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash]))

		elapsedTimeMicrosecond, err := strconv.ParseInt(row[totalElapsedTime], 10, 64)
		if err != nil {
			s.logger.Info(fmt.Sprintf("sqlServerScraperHelper failed getting rows: %s", err))
			errs = append(errs, err)
		} else {
			// we're trying to get the queries that used the most time.
			// caching the total elapsed time (in microsecond) and compare in the next scrape.
			if cached, diff := s.cacheAndDiff(queryHashVal, queryPlanHashVal, totalElapsedTime, elapsedTimeMicrosecond); cached && diff > 0 {
				totalElapsedTimeDiffsMicrosecond[i] = diff
			}
		}
	}

	// sort the rows based on the totalElapsedTimeDiffs in descending order,
	// only report first T(T=topQueryCount) rows.
	rows = sortRows(rows, totalElapsedTimeDiffsMicrosecond, topQueryCount)

	// sort the totalElapsedTimeDiffs in descending order as well
	sort.Slice(totalElapsedTimeDiffsMicrosecond, func(i, j int) bool { return totalElapsedTimeDiffsMicrosecond[i] > totalElapsedTimeDiffsMicrosecond[j] })

	logs := plog.NewLogs()
	resourceLog := logs.ResourceLogs().AppendEmpty()

	scopedLog := resourceLog.ScopeLogs().AppendEmpty()
	scopedLog.Scope().SetName(metadata.ScopeName)
	scopedLog.Scope().SetVersion("0.0.1")

	timestamp := pcommon.NewTimestampFromTime(time.Now())
	for i, row := range rows {
		// skipping the rest of the rows as totalElapsedTimeDiffs is sorted in descending order
		if totalElapsedTimeDiffsMicrosecond[i] == 0 {
			break
		}

		// reporting human-readable query hash and query hash plan
		queryHashVal := hex.EncodeToString([]byte(row[queryHash]))
		queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash]))

		record := scopedLog.LogRecords().AppendEmpty()
		record.SetTimestamp(timestamp)
		record.SetEventName("top query")

		attributes := []internalAttribute{
			{
				key:            computerNameKey,
				columnName:     computerNameKey,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:        "db.query.text",
				columnName: queryText,
				valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) {
					return obfuscateSQL(row[columnName])
				},
				valueSetter: setString,
			},
			{
				key:            dbPrefix + executionCount,
				columnName:     executionCount,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + logicalReads,
				columnName:     logicalReads,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + logicalWrites,
				columnName:     logicalWrites,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + physicalReads,
				columnName:     physicalReads,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + queryHash,
				valueRetriever: defaultValueRetriever(queryHashVal),
				valueSetter:    setString,
			},
			{
				key:        dbPrefix + queryPlan,
				columnName: queryPlan,
				valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) {
					return obfuscateXMLPlan(row[columnName])
				},
				valueSetter: setString,
			},
			{
				key:            dbPrefix + queryPlanHash,
				valueRetriever: defaultValueRetriever(queryPlanHashVal),
				valueSetter:    setString,
			},
			{
				key:            dbPrefix + rowsReturned,
				columnName:     rowsReturned,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + totalElapsedTime,
				valueRetriever: defaultValueRetriever(float64(totalElapsedTimeDiffsMicrosecond[i]) / 1_000_000),
				valueSetter:    setDouble,
			},
			{
				key:            dbPrefix + totalGrant,
				columnName:     totalGrant,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            "db.system.name",
				valueRetriever: defaultValueRetriever("microsoft.sql_server"),
				valueSetter:    setString,
			},
			{
				key:            instanceNameKey,
				columnName:     instanceNameKey,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:            serverAddressKey,
				valueRetriever: defaultValueRetriever(s.config.Server),
				valueSetter:    setString,
			},
			{
				key:            serverPortKey,
				valueRetriever: defaultValueRetriever((int64(s.config.Port))),
				valueSetter:    setInt,
			},
		}

		updatedOnly := map[string]bool{
			rowsReturned:   true,
			logicalReads:   true,
			logicalWrites:  true,
			physicalReads:  true,
			executionCount: true,
			totalGrant:     true,
		}

		s.logger.Debug(fmt.Sprintf("QueryHash: %v, PlanHash: %v, DataRow: %v", queryHashVal, queryPlanHashVal, row))

		// handle `total_worker_time`, storing seconds
		// it is a little bit tricky to put this to the array based workflow,
		// as the value need to be divided -> type assertion -> check cache.
		// hence handle it separately.
		workerTimeMicrosecond, err := strconv.ParseInt(row[totalWorkerTime], 10, 64)
		if err != nil {
			err = fmt.Errorf("row %d: %w", i, err)
			errs = append(errs, err)
		} else {
			if cached, diffMicrosecond := s.cacheAndDiff(queryHashVal, queryPlanHashVal, totalWorkerTime, workerTimeMicrosecond); cached {
				record.Attributes().PutDouble(dbPrefix+totalWorkerTime, float64(diffMicrosecond)/1_000_000)
			}
		}

		for _, attr := range attributes {
			value, err := attr.valueRetriever(row, attr.columnName)
			if err != nil {
				s.logger.Error(fmt.Sprintf("sqlServerScraperHelper failed parsing %s. original value: %s, err: %s", attr.columnName, row[attr.columnName], err))
				errs = append(errs, err)
			}
			if _, ok := updatedOnly[attr.columnName]; ok {
				if cached, diff := s.cacheAndDiff(queryHashVal, queryPlanHashVal, attr.columnName, value.(int64)); cached {
					attr.valueSetter(record.Attributes(), attr.key, diff)
				}
			} else {
				attr.valueSetter(record.Attributes(), attr.key, value)
			}
		}
	}

	return logs, errors.Join(errs...)
}