receiver/sqlserverreceiver/scraper.go (1,036 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package sqlserverreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlserverreceiver" import ( "container/heap" "context" "database/sql" "encoding/hex" "errors" "fmt" "sort" "strconv" "time" lru "github.com/hashicorp/golang-lru/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/scraper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlserverreceiver/internal/metadata" ) const ( computerNameKey = "computer_name" instanceNameKey = "sql_instance" serverAddressKey = "server.address" serverPortKey = "server.port" ) type sqlServerScraperHelper struct { id component.ID config *Config sqlQuery string instanceName string clientProviderFunc sqlquery.ClientProviderFunc dbProviderFunc sqlquery.DbProviderFunc logger *zap.Logger telemetry sqlquery.TelemetryConfig client sqlquery.DbClient db *sql.DB mb *metadata.MetricsBuilder cache *lru.Cache[string, int64] } var ( _ scraper.Metrics = (*sqlServerScraperHelper)(nil) _ scraper.Logs = (*sqlServerScraperHelper)(nil) ) func newSQLServerScraper(id component.ID, query string, telemetry sqlquery.TelemetryConfig, dbProviderFunc sqlquery.DbProviderFunc, clientProviderFunc sqlquery.ClientProviderFunc, params receiver.Settings, cfg *Config, cache *lru.Cache[string, int64], ) *sqlServerScraperHelper { return &sqlServerScraperHelper{ id: id, config: cfg, sqlQuery: query, logger: params.Logger, telemetry: telemetry, dbProviderFunc: dbProviderFunc, clientProviderFunc: clientProviderFunc, mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, params), cache: cache, } } func (s *sqlServerScraperHelper) ID() component.ID { return s.id } func (s *sqlServerScraperHelper) Start(context.Context, component.Host) error { var err error s.db, err = s.dbProviderFunc() if err != nil { return fmt.Errorf("failed to open Db connection: %w", err) } s.client = s.clientProviderFunc(sqlquery.DbWrapper{Db: s.db}, s.sqlQuery, s.logger, s.telemetry) return nil } func (s *sqlServerScraperHelper) ScrapeMetrics(ctx context.Context) (pmetric.Metrics, error) { var err error switch s.sqlQuery { case getSQLServerDatabaseIOQuery(s.config.InstanceName): err = s.recordDatabaseIOMetrics(ctx) case getSQLServerPerformanceCounterQuery(s.config.InstanceName): err = s.recordDatabasePerfCounterMetrics(ctx) case getSQLServerPropertiesQuery(s.config.InstanceName): err = s.recordDatabaseStatusMetrics(ctx) default: return pmetric.Metrics{}, fmt.Errorf("Attempted to get metrics from unsupported query: %s", s.sqlQuery) } if err != nil { return pmetric.Metrics{}, err } return s.mb.Emit(), nil } func (s *sqlServerScraperHelper) ScrapeLogs(ctx context.Context) (plog.Logs, error) { switch s.sqlQuery { case getSQLServerQueryTextAndPlanQuery(): return s.recordDatabaseQueryTextAndPlan(ctx, s.config.TopQueryCount) case getSQLServerQuerySamplesQuery(): return s.recordDatabaseSampleQuery(ctx) default: return plog.Logs{}, fmt.Errorf("Attempted to get logs from unsupported query: %s", s.sqlQuery) } } func (s *sqlServerScraperHelper) Shutdown(_ context.Context) error { if s.db != nil { return s.db.Close() } return nil } func (s *sqlServerScraperHelper) recordDatabaseIOMetrics(ctx context.Context) error { const databaseNameKey = "database_name" const physicalFilenameKey = "physical_filename" const logicalFilenameKey = "logical_filename" const fileTypeKey = "file_type" const readLatencyMsKey = "read_latency_ms" const writeLatencyMsKey = "write_latency_ms" const readCountKey = "reads" const writeCountKey = "writes" const readBytesKey = "read_bytes" const writeBytesKey = "write_bytes" rows, err := s.client.QueryRows(ctx) if err != nil { if !errors.Is(err, sqlquery.ErrNullValueWarning) { return fmt.Errorf("sqlServerScraperHelper: %w", err) } s.logger.Warn("problems encountered getting metric rows", zap.Error(err)) } var errs []error now := pcommon.NewTimestampFromTime(time.Now()) var val float64 for i, row := range rows { rb := s.mb.NewResourceBuilder() rb.SetSqlserverComputerName(row[computerNameKey]) rb.SetSqlserverDatabaseName(row[databaseNameKey]) rb.SetSqlserverInstanceName(row[instanceNameKey]) rb.SetServerAddress(s.config.Server) rb.SetServerPort(int64(s.config.Port)) val, err = strconv.ParseFloat(row[readLatencyMsKey], 64) if err != nil { err = fmt.Errorf("row %d: %w", i, err) errs = append(errs, err) } else { s.mb.RecordSqlserverDatabaseLatencyDataPoint(now, val/1e3, row[physicalFilenameKey], row[logicalFilenameKey], row[fileTypeKey], metadata.AttributeDirectionRead) } val, err = strconv.ParseFloat(row[writeLatencyMsKey], 64) if err != nil { err = fmt.Errorf("row %d: %w", i, err) errs = append(errs, err) } else { s.mb.RecordSqlserverDatabaseLatencyDataPoint(now, val/1e3, row[physicalFilenameKey], row[logicalFilenameKey], row[fileTypeKey], metadata.AttributeDirectionWrite) } errs = append(errs, s.mb.RecordSqlserverDatabaseOperationsDataPoint(now, row[readCountKey], row[physicalFilenameKey], row[logicalFilenameKey], row[fileTypeKey], metadata.AttributeDirectionRead)) errs = append(errs, s.mb.RecordSqlserverDatabaseOperationsDataPoint(now, row[writeCountKey], row[physicalFilenameKey], row[logicalFilenameKey], row[fileTypeKey], metadata.AttributeDirectionWrite)) errs = append(errs, s.mb.RecordSqlserverDatabaseIoDataPoint(now, row[readBytesKey], row[physicalFilenameKey], row[logicalFilenameKey], row[fileTypeKey], metadata.AttributeDirectionRead)) errs = append(errs, s.mb.RecordSqlserverDatabaseIoDataPoint(now, row[writeBytesKey], row[physicalFilenameKey], row[logicalFilenameKey], row[fileTypeKey], metadata.AttributeDirectionWrite)) s.mb.EmitForResource(metadata.WithResource(rb.Emit())) } if len(rows) == 0 { s.logger.Info("SQLServerScraperHelper: No rows found by query") } return errors.Join(errs...) } func (s *sqlServerScraperHelper) recordDatabasePerfCounterMetrics(ctx context.Context) error { const counterKey = "counter" const valueKey = "value" // Constants are the columns for metrics from query const activeTempTables = "Active Temp Tables" const backupRestoreThroughputPerSec = "Backup/Restore Throughput/sec" const batchRequestRate = "Batch Requests/sec" const bufferCacheHitRatio = "Buffer cache hit ratio" const bytesReceivedFromReplicaPerSec = "Bytes Received from Replica/sec" const bytesSentForReplicaPerSec = "Bytes Sent to Replica/sec" const diskReadIOThrottled = "Disk Read IO Throttled/sec" const diskWriteIOThrottled = "Disk Write IO Throttled/sec" const executionErrors = "Execution Errors" const freeListStalls = "Free list stalls/sec" const freeSpaceInTempdb = "Free Space in tempdb (KB)" const fullScansPerSec = "Full Scans/sec" const indexSearchesPerSec = "Index Searches/sec" const lockTimeoutsPerSec = "Lock Timeouts/sec" const lockWaits = "Lock Waits/sec" const loginsPerSec = "Logins/sec" const logoutPerSec = "Logouts/sec" const numberOfDeadlocksPerSec = "Number of Deadlocks/sec" const mirrorWritesTransactionPerSec = "Mirrored Write Transactions/sec" const memoryGrantsPending = "Memory Grants Pending" const pageLookupsPerSec = "Page lookups/sec" const processesBlocked = "Processes blocked" const sqlCompilationRate = "SQL Compilations/sec" const sqlReCompilationsRate = "SQL Re-Compilations/sec" const transactionDelay = "Transaction Delay" const userConnCount = "User Connections" const usedMemory = "Used memory (KB)" const versionStoreSize = "Version Store Size (KB)" rows, err := s.client.QueryRows(ctx) if err != nil { if !errors.Is(err, sqlquery.ErrNullValueWarning) { return fmt.Errorf("sqlServerScraperHelper: %w", err) } s.logger.Warn("problems encountered getting metric rows", zap.Error(err)) } var errs []error now := pcommon.NewTimestampFromTime(time.Now()) for i, row := range rows { rb := s.mb.NewResourceBuilder() rb.SetSqlserverComputerName(row[computerNameKey]) rb.SetSqlserverInstanceName(row[instanceNameKey]) rb.SetServerAddress(s.config.Server) rb.SetServerPort(int64(s.config.Port)) switch row[counterKey] { case activeTempTables: val, err := strconv.ParseInt(row[valueKey], 10, 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, activeTempTables) errs = append(errs, err) } else { s.mb.RecordSqlserverTableCountDataPoint(now, val, metadata.AttributeTableStateActive, metadata.AttributeTableStatusTemporary) } case backupRestoreThroughputPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, backupRestoreThroughputPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverDatabaseBackupOrRestoreRateDataPoint(now, val) } case batchRequestRate: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, batchRequestRate) errs = append(errs, err) } else { s.mb.RecordSqlserverBatchRequestRateDataPoint(now, val) } case bufferCacheHitRatio: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, bufferCacheHitRatio) errs = append(errs, err) } else { s.mb.RecordSqlserverPageBufferCacheHitRatioDataPoint(now, val) } case bytesReceivedFromReplicaPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, bytesReceivedFromReplicaPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverReplicaDataRateDataPoint(now, val, metadata.AttributeReplicaDirectionReceive) } case bytesSentForReplicaPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, bytesReceivedFromReplicaPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverReplicaDataRateDataPoint(now, val, metadata.AttributeReplicaDirectionTransmit) } case diskReadIOThrottled: errs = append(errs, s.mb.RecordSqlserverResourcePoolDiskThrottledReadRateDataPoint(now, row[valueKey])) case diskWriteIOThrottled: errs = append(errs, s.mb.RecordSqlserverResourcePoolDiskThrottledWriteRateDataPoint(now, row[valueKey])) case executionErrors: val, err := strconv.ParseInt(row[valueKey], 10, 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, executionErrors) errs = append(errs, err) } else { s.mb.RecordSqlserverDatabaseExecutionErrorsDataPoint(now, val) } case freeListStalls: val, err := strconv.ParseInt(row[valueKey], 10, 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, freeListStalls) errs = append(errs, err) } else { s.mb.RecordSqlserverPageBufferCacheFreeListStallsRateDataPoint(now, val) } case fullScansPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, fullScansPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverDatabaseFullScanRateDataPoint(now, val) } case freeSpaceInTempdb: val, err := strconv.ParseInt(row[valueKey], 10, 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, freeSpaceInTempdb) errs = append(errs, err) } else { s.mb.RecordSqlserverDatabaseTempdbSpaceDataPoint(now, val, metadata.AttributeTempdbStateFree) } case indexSearchesPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, indexSearchesPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverIndexSearchRateDataPoint(now, val) } case lockTimeoutsPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, lockTimeoutsPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverLockTimeoutRateDataPoint(now, val) } case lockWaits: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, lockWaits) errs = append(errs, err) } else { s.mb.RecordSqlserverLockWaitRateDataPoint(now, val) } case loginsPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, loginsPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverLoginRateDataPoint(now, val) } case logoutPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, logoutPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverLogoutRateDataPoint(now, val) } case memoryGrantsPending: val, err := strconv.ParseInt(row[valueKey], 10, 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, memoryGrantsPending) errs = append(errs, err) } else { s.mb.RecordSqlserverMemoryGrantsPendingCountDataPoint(now, val) } case mirrorWritesTransactionPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, mirrorWritesTransactionPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverTransactionMirrorWriteRateDataPoint(now, val) } case numberOfDeadlocksPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, numberOfDeadlocksPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverDeadlockRateDataPoint(now, val) } case pageLookupsPerSec: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, pageLookupsPerSec) errs = append(errs, err) } else { s.mb.RecordSqlserverPageLookupRateDataPoint(now, val) } case processesBlocked: errs = append(errs, s.mb.RecordSqlserverProcessesBlockedDataPoint(now, row[valueKey])) case sqlCompilationRate: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, sqlCompilationRate) errs = append(errs, err) } else { s.mb.RecordSqlserverBatchSQLCompilationRateDataPoint(now, val) } case sqlReCompilationsRate: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, sqlReCompilationsRate) errs = append(errs, err) } else { s.mb.RecordSqlserverBatchSQLRecompilationRateDataPoint(now, val) } case transactionDelay: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, transactionDelay) errs = append(errs, err) } else { s.mb.RecordSqlserverTransactionDelayDataPoint(now, val) } case userConnCount: val, err := strconv.ParseInt(row[valueKey], 10, 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, userConnCount) errs = append(errs, err) } else { s.mb.RecordSqlserverUserConnectionCountDataPoint(now, val) } case usedMemory: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, usedMemory) errs = append(errs, err) } else { s.mb.RecordSqlserverMemoryUsageDataPoint(now, val) } case versionStoreSize: val, err := strconv.ParseFloat(row[valueKey], 64) if err != nil { err = fmt.Errorf("failed to parse valueKey for row %d: %w in %s", i, err, versionStoreSize) errs = append(errs, err) } else { s.mb.RecordSqlserverDatabaseTempdbVersionStoreSizeDataPoint(now, val) } } s.mb.EmitForResource(metadata.WithResource(rb.Emit())) } return errors.Join(errs...) } func (s *sqlServerScraperHelper) recordDatabaseStatusMetrics(ctx context.Context) error { // Constants are the column names of the database status const dbOnline = "db_online" const dbRestoring = "db_restoring" const dbRecovering = "db_recovering" const dbPendingRecovery = "db_recoveryPending" const dbSuspect = "db_suspect" const dbOffline = "db_offline" rows, err := s.client.QueryRows(ctx) if err != nil { if !errors.Is(err, sqlquery.ErrNullValueWarning) { return fmt.Errorf("sqlServerScraperHelper failed getting metric rows: %w", err) } s.logger.Warn("problems encountered getting metric rows", zap.Error(err)) } var errs []error now := pcommon.NewTimestampFromTime(time.Now()) for _, row := range rows { rb := s.mb.NewResourceBuilder() rb.SetSqlserverComputerName(row[computerNameKey]) rb.SetSqlserverInstanceName(row[instanceNameKey]) rb.SetServerAddress(s.config.Server) rb.SetServerPort(int64(s.config.Port)) errs = append(errs, s.mb.RecordSqlserverDatabaseCountDataPoint(now, row[dbOnline], metadata.AttributeDatabaseStatusOnline)) errs = append(errs, s.mb.RecordSqlserverDatabaseCountDataPoint(now, row[dbRestoring], metadata.AttributeDatabaseStatusRestoring)) errs = append(errs, s.mb.RecordSqlserverDatabaseCountDataPoint(now, row[dbRecovering], metadata.AttributeDatabaseStatusRecovering)) errs = append(errs, s.mb.RecordSqlserverDatabaseCountDataPoint(now, row[dbPendingRecovery], metadata.AttributeDatabaseStatusPendingRecovery)) errs = append(errs, s.mb.RecordSqlserverDatabaseCountDataPoint(now, row[dbSuspect], metadata.AttributeDatabaseStatusSuspect)) errs = append(errs, s.mb.RecordSqlserverDatabaseCountDataPoint(now, row[dbOffline], metadata.AttributeDatabaseStatusOffline)) s.mb.EmitForResource(metadata.WithResource(rb.Emit())) } return errors.Join(errs...) } func (s *sqlServerScraperHelper) recordDatabaseQueryTextAndPlan(ctx context.Context, topQueryCount uint) (plog.Logs, error) { // Constants are the column names of the database status const ( dbPrefix = "sqlserver." executionCount = "execution_count" logicalReads = "total_logical_reads" logicalWrites = "total_logical_writes" physicalReads = "total_physical_reads" queryHash = "query_hash" queryPlan = "query_plan" queryPlanHash = "query_plan_hash" queryText = "query_text" rowsReturned = "total_rows" // the time returned from mssql is in microsecond totalElapsedTime = "total_elapsed_time" totalGrant = "total_grant_kb" // the time returned from mssql is in microsecond totalWorkerTime = "total_worker_time" ) rows, err := s.client.QueryRows( ctx, sql.Named("lookbackTime", -s.config.LookbackTime), sql.Named("topNValue", s.config.TopQueryCount), sql.Named("instanceName", s.config.InstanceName), ) if err != nil { if !errors.Is(err, sqlquery.ErrNullValueWarning) { return plog.Logs{}, fmt.Errorf("sqlServerScraperHelper failed getting rows: %w", err) } s.logger.Warn("problems encountered getting log rows", zap.Error(err)) } var errs []error totalElapsedTimeDiffsMicrosecond := make([]int64, len(rows)) for i, row := range rows { queryHashVal := hex.EncodeToString([]byte(row[queryHash])) queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash])) elapsedTimeMicrosecond, err := strconv.ParseInt(row[totalElapsedTime], 10, 64) if err != nil { s.logger.Info(fmt.Sprintf("sqlServerScraperHelper failed getting rows: %s", err)) errs = append(errs, err) } else { // we're trying to get the queries that used the most time. // caching the total elapsed time (in microsecond) and compare in the next scrape. if cached, diff := s.cacheAndDiff(queryHashVal, queryPlanHashVal, totalElapsedTime, elapsedTimeMicrosecond); cached && diff > 0 { totalElapsedTimeDiffsMicrosecond[i] = diff } } } // sort the rows based on the totalElapsedTimeDiffs in descending order, // only report first T(T=topQueryCount) rows. rows = sortRows(rows, totalElapsedTimeDiffsMicrosecond, topQueryCount) // sort the totalElapsedTimeDiffs in descending order as well sort.Slice(totalElapsedTimeDiffsMicrosecond, func(i, j int) bool { return totalElapsedTimeDiffsMicrosecond[i] > totalElapsedTimeDiffsMicrosecond[j] }) logs := plog.NewLogs() resourceLog := logs.ResourceLogs().AppendEmpty() scopedLog := resourceLog.ScopeLogs().AppendEmpty() scopedLog.Scope().SetName(metadata.ScopeName) scopedLog.Scope().SetVersion("0.0.1") timestamp := pcommon.NewTimestampFromTime(time.Now()) for i, row := range rows { // skipping the rest of the rows as totalElapsedTimeDiffs is sorted in descending order if totalElapsedTimeDiffsMicrosecond[i] == 0 { break } // reporting human-readable query hash and query hash plan queryHashVal := hex.EncodeToString([]byte(row[queryHash])) queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash])) record := scopedLog.LogRecords().AppendEmpty() record.SetTimestamp(timestamp) record.SetEventName("top query") attributes := []internalAttribute{ { key: computerNameKey, columnName: computerNameKey, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: "db.query.text", columnName: queryText, valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) { return obfuscateSQL(row[columnName]) }, valueSetter: setString, }, { key: dbPrefix + executionCount, columnName: executionCount, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + logicalReads, columnName: logicalReads, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + logicalWrites, columnName: logicalWrites, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + physicalReads, columnName: physicalReads, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + queryHash, valueRetriever: defaultValueRetriever(queryHashVal), valueSetter: setString, }, { key: dbPrefix + queryPlan, columnName: queryPlan, valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) { return obfuscateXMLPlan(row[columnName]) }, valueSetter: setString, }, { key: dbPrefix + queryPlanHash, valueRetriever: defaultValueRetriever(queryPlanHashVal), valueSetter: setString, }, { key: dbPrefix + rowsReturned, columnName: rowsReturned, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + totalElapsedTime, valueRetriever: defaultValueRetriever(float64(totalElapsedTimeDiffsMicrosecond[i]) / 1_000_000), valueSetter: setDouble, }, { key: dbPrefix + totalGrant, columnName: totalGrant, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: "db.system.name", valueRetriever: defaultValueRetriever("microsoft.sql_server"), valueSetter: setString, }, { key: instanceNameKey, columnName: instanceNameKey, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: serverAddressKey, valueRetriever: defaultValueRetriever(s.config.Server), valueSetter: setString, }, { key: serverPortKey, valueRetriever: defaultValueRetriever((int64(s.config.Port))), valueSetter: setInt, }, } updatedOnly := map[string]bool{ rowsReturned: true, logicalReads: true, logicalWrites: true, physicalReads: true, executionCount: true, totalGrant: true, } s.logger.Debug(fmt.Sprintf("QueryHash: %v, PlanHash: %v, DataRow: %v", queryHashVal, queryPlanHashVal, row)) // handle `total_worker_time`, storing seconds // it is a little bit tricky to put this to the array based workflow, // as the value need to be divided -> type assertion -> check cache. // hence handle it separately. workerTimeMicrosecond, err := strconv.ParseInt(row[totalWorkerTime], 10, 64) if err != nil { err = fmt.Errorf("row %d: %w", i, err) errs = append(errs, err) } else { if cached, diffMicrosecond := s.cacheAndDiff(queryHashVal, queryPlanHashVal, totalWorkerTime, workerTimeMicrosecond); cached { record.Attributes().PutDouble(dbPrefix+totalWorkerTime, float64(diffMicrosecond)/1_000_000) } } for _, attr := range attributes { value, err := attr.valueRetriever(row, attr.columnName) if err != nil { s.logger.Error(fmt.Sprintf("sqlServerScraperHelper failed parsing %s. original value: %s, err: %s", attr.columnName, row[attr.columnName], err)) errs = append(errs, err) } if _, ok := updatedOnly[attr.columnName]; ok { if cached, diff := s.cacheAndDiff(queryHashVal, queryPlanHashVal, attr.columnName, value.(int64)); cached { attr.valueSetter(record.Attributes(), attr.key, diff) } } else { attr.valueSetter(record.Attributes(), attr.key, value) } } } return logs, errors.Join(errs...) } // cacheAndDiff store row(in int) with query hash and query plan hash variables // (1) returns true if the key is cached before // (2) returns positive value if the value is larger than the cached value func (s *sqlServerScraperHelper) cacheAndDiff(queryHash string, queryPlanHash string, column string, val int64) (bool, int64) { if val < 0 { return false, 0 } key := queryHash + "-" + queryPlanHash + "-" + column cached, ok := s.cache.Get(key) if !ok { s.cache.Add(key, val) return false, val } if val > cached { s.cache.Add(key, val) return true, val - cached } return true, 0 } type Item struct { row sqlquery.StringMap priority int64 index int } // reference: https://pkg.go.dev/container/heap#example-package-priorityQueue type priorityQueue []*Item func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { return pq[i].priority > pq[j].priority } func (pq priorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } func (pq *priorityQueue) Push(x any) { n := len(*pq) item := x.(*Item) item.index = n *pq = append(*pq, item) } func (pq *priorityQueue) Pop() any { old := *pq n := len(old) item := old[n-1] old[n-1] = nil // don't stop the GC from reclaiming the item eventually item.index = -1 // for safety *pq = old[0 : n-1] return item } // sortRows sorts the rows based on the `values` slice in descending order and return the first M(M=maximum) rows // Input: (row: [row1, row2, row3], values: [100, 10, 1000], maximum: 2 // Expected Output: (row: [row3, row1] func sortRows(rows []sqlquery.StringMap, values []int64, maximum uint) []sqlquery.StringMap { results := make([]sqlquery.StringMap, 0) if len(rows) == 0 || len(values) == 0 || len(rows) != len(values) || maximum <= 0 { return []sqlquery.StringMap{} } pq := make(priorityQueue, len(rows)) for i, row := range rows { value := values[i] pq[i] = &Item{ row: row, priority: value, index: i, } } heap.Init(&pq) for pq.Len() > 0 && len(results) < int(maximum) { item := heap.Pop(&pq).(*Item) results = append(results, item.row) } return results } type internalAttribute struct { key string columnName string valueRetriever func(row sqlquery.StringMap, columnName string) (any, error) valueSetter func(attributes pcommon.Map, key string, value any) } func defaultValueRetriever(defaultValue any) func(row sqlquery.StringMap, columnName string) (any, error) { return func(_ sqlquery.StringMap, _ string) (any, error) { return defaultValue, nil } } func vanillaRetriever(row sqlquery.StringMap, columnName string) (any, error) { return row[columnName], nil } func retrieveInt(row sqlquery.StringMap, columnName string) (any, error) { var err error result := 0 if row[columnName] != "" { result, err = strconv.Atoi(row[columnName]) } return int64(result), err } func retrieveIntAndConvert(convert func(int64) any) func(row sqlquery.StringMap, columnName string) (any, error) { return func(row sqlquery.StringMap, columnName string) (any, error) { result, err := retrieveInt(row, columnName) // need to convert even if it failed return convert(result.(int64)), err } } func retrieveFloat(row sqlquery.StringMap, columnName string) (any, error) { var err error var result float64 if row[columnName] != "" { result, err = strconv.ParseFloat(row[columnName], 64) } return result, err } func setString(attributes pcommon.Map, key string, value any) { attributes.PutStr(key, value.(string)) } func setInt(attributes pcommon.Map, key string, value any) { attributes.PutInt(key, value.(int64)) } func setDouble(attributes pcommon.Map, key string, value any) { attributes.PutDouble(key, value.(float64)) } func (s *sqlServerScraperHelper) recordDatabaseSampleQuery(ctx context.Context) (plog.Logs, error) { const blockingSessionID = "blocking_session_id" const clientAddress = "client_address" const clientPort = "client_port" const command = "command" const contextInfo = "context_info" const cpuTimeMillisecond = "cpu_time" const dbName = "db_name" const dbPrefix = "sqlserver." const deadlockPriority = "deadlock_priority" const estimatedCompletionTimeMillisecond = "estimated_completion_time" const hostName = "host_name" const lockTimeoutMillisecond = "lock_timeout" const logicalReads = "logical_reads" const openTransactionCount = "open_transaction_count" const percentComplete = "percent_complete" const queryHash = "query_hash" const queryPlanHash = "query_plan_hash" const queryStart = "query_start" const reads = "reads" const requestStatus = "request_status" const rowCount = "row_count" const sessionID = "session_id" const sessionStatus = "session_status" const statementText = "statement_text" const totalElapsedTimeMillisecond = "total_elapsed_time" const transactionID = "transaction_id" const transactionIsolationLevel = "transaction_isolation_level" const username = "username" const waitResource = "wait_resource" const waitTimeMillisecond = "wait_time" const waitType = "wait_type" const writes = "writes" rows, err := s.client.QueryRows( ctx, sql.Named("top", s.config.TopQueryCount), ) if err != nil { if !errors.Is(err, sqlquery.ErrNullValueWarning) { return plog.Logs{}, fmt.Errorf("sqlServerScraperHelper failed getting log rows: %w", err) } // in case the sql returned rows contains null value, we just log a warning and continue s.logger.Warn("problems encountered getting log rows", zap.Error(err)) } var errs []error logs := plog.NewLogs() resourceLog := logs.ResourceLogs().AppendEmpty() scopedLog := resourceLog.ScopeLogs().AppendEmpty() scopedLog.Scope().SetName(metadata.ScopeName) scopedLog.Scope().SetVersion("v0.0.1") for _, row := range rows { queryHashVal := hex.EncodeToString([]byte(row[queryHash])) queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash])) contextInfoVal := hex.EncodeToString([]byte(row[contextInfo])) record := scopedLog.LogRecords().AppendEmpty() record.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) // Attributes sorted alphabetically by key attributes := []internalAttribute{ { key: "client.port", columnName: clientPort, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: "db.namespace", columnName: dbName, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: "db.query.text", columnName: statementText, valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) { return obfuscateSQL(row[columnName]) }, valueSetter: setString, }, { key: "db.system.name", valueRetriever: defaultValueRetriever("microsoft.sql_server"), valueSetter: setString, }, { key: "network.peer.address", columnName: clientAddress, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: "network.peer.port", columnName: clientPort, valueRetriever: retrieveInt, valueSetter: setInt, }, // the following ones are the attributes that are not in the semantic conventions { key: dbPrefix + blockingSessionID, columnName: blockingSessionID, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + command, columnName: command, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: dbPrefix + contextInfo, valueRetriever: defaultValueRetriever(contextInfoVal), valueSetter: setString, }, { key: dbPrefix + cpuTimeMillisecond, columnName: cpuTimeMillisecond, valueRetriever: retrieveIntAndConvert(func(i int64) any { return float64(i) / 1000.0 }), valueSetter: setDouble, }, { key: dbPrefix + deadlockPriority, columnName: deadlockPriority, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + estimatedCompletionTimeMillisecond, columnName: estimatedCompletionTimeMillisecond, valueRetriever: retrieveIntAndConvert(func(i int64) any { return float64(i) / 1000.0 }), valueSetter: setDouble, }, { key: dbPrefix + lockTimeoutMillisecond, columnName: lockTimeoutMillisecond, valueRetriever: retrieveIntAndConvert(func(i int64) any { return float64(i) / 1000.0 }), valueSetter: setDouble, }, { key: dbPrefix + logicalReads, columnName: logicalReads, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + openTransactionCount, columnName: openTransactionCount, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + percentComplete, columnName: percentComplete, valueRetriever: retrieveFloat, valueSetter: setDouble, }, { key: dbPrefix + queryHash, valueRetriever: defaultValueRetriever(queryHashVal), valueSetter: setString, }, { key: dbPrefix + queryPlanHash, valueRetriever: defaultValueRetriever(queryPlanHashVal), valueSetter: setString, }, { key: dbPrefix + queryStart, columnName: queryStart, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: dbPrefix + reads, columnName: reads, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + requestStatus, columnName: requestStatus, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: dbPrefix + rowCount, columnName: rowCount, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + sessionID, columnName: sessionID, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + sessionStatus, columnName: sessionStatus, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: dbPrefix + totalElapsedTimeMillisecond, columnName: totalElapsedTimeMillisecond, valueRetriever: retrieveIntAndConvert(func(i int64) any { return float64(i) / 1000.0 }), valueSetter: setDouble, }, { key: dbPrefix + transactionID, columnName: transactionID, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + transactionIsolationLevel, columnName: transactionIsolationLevel, valueRetriever: retrieveInt, valueSetter: setInt, }, { key: dbPrefix + username, columnName: username, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: dbPrefix + waitResource, columnName: waitResource, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: dbPrefix + waitTimeMillisecond, columnName: waitTimeMillisecond, valueRetriever: retrieveIntAndConvert(func(i int64) any { return float64(i) / 1000.0 }), valueSetter: setDouble, }, { key: dbPrefix + waitType, columnName: waitType, valueRetriever: vanillaRetriever, valueSetter: setString, }, { key: dbPrefix + writes, columnName: writes, valueRetriever: retrieveInt, valueSetter: setInt, }, } for _, attr := range attributes { value, err := attr.valueRetriever(row, attr.columnName) if err != nil { errs = append(errs, err) s.logger.Error(fmt.Sprintf("sqlServerScraperHelper failed parsing %s. original value: %s, err: %s", attr.columnName, row[attr.columnName], err)) } attr.valueSetter(record.Attributes(), attr.key, value) } // client.address: use host_name if it has value, if not, use client_net_address. // this value may not be accurate if // - there is proxy in the middle of sql client and sql server. Or // - host_name value is empty or not accurate. if row[hostName] != "" { record.Attributes().PutStr("client.address", row[hostName]) } else { record.Attributes().PutStr("client.address", row[clientAddress]) } record.SetEventName("query sample") record.Body().SetStr("sample") } return logs, errors.Join(errs...) }