func()

in receiver/sqlserverreceiver/scraper.go [870:1161]


func (s *sqlServerScraperHelper) recordDatabaseSampleQuery(ctx context.Context) (plog.Logs, error) {
	const blockingSessionID = "blocking_session_id"
	const clientAddress = "client_address"
	const clientPort = "client_port"
	const command = "command"
	const contextInfo = "context_info"
	const cpuTimeMillisecond = "cpu_time"
	const dbName = "db_name"
	const dbPrefix = "sqlserver."
	const deadlockPriority = "deadlock_priority"
	const estimatedCompletionTimeMillisecond = "estimated_completion_time"
	const hostName = "host_name"
	const lockTimeoutMillisecond = "lock_timeout"
	const logicalReads = "logical_reads"
	const openTransactionCount = "open_transaction_count"
	const percentComplete = "percent_complete"
	const queryHash = "query_hash"
	const queryPlanHash = "query_plan_hash"
	const queryStart = "query_start"
	const reads = "reads"
	const requestStatus = "request_status"
	const rowCount = "row_count"
	const sessionID = "session_id"
	const sessionStatus = "session_status"
	const statementText = "statement_text"
	const totalElapsedTimeMillisecond = "total_elapsed_time"
	const transactionID = "transaction_id"
	const transactionIsolationLevel = "transaction_isolation_level"
	const username = "username"
	const waitResource = "wait_resource"
	const waitTimeMillisecond = "wait_time"
	const waitType = "wait_type"
	const writes = "writes"

	rows, err := s.client.QueryRows(
		ctx,
		sql.Named("top", s.config.TopQueryCount),
	)
	if err != nil {
		if !errors.Is(err, sqlquery.ErrNullValueWarning) {
			return plog.Logs{}, fmt.Errorf("sqlServerScraperHelper failed getting log rows: %w", err)
		}
		// in case the sql returned rows contains null value, we just log a warning and continue
		s.logger.Warn("problems encountered getting log rows", zap.Error(err))
	}

	var errs []error
	logs := plog.NewLogs()

	resourceLog := logs.ResourceLogs().AppendEmpty()

	scopedLog := resourceLog.ScopeLogs().AppendEmpty()
	scopedLog.Scope().SetName(metadata.ScopeName)
	scopedLog.Scope().SetVersion("v0.0.1")
	for _, row := range rows {
		queryHashVal := hex.EncodeToString([]byte(row[queryHash]))
		queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash]))
		contextInfoVal := hex.EncodeToString([]byte(row[contextInfo]))

		record := scopedLog.LogRecords().AppendEmpty()
		record.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))

		// Attributes sorted alphabetically by key
		attributes := []internalAttribute{
			{
				key:            "client.port",
				columnName:     clientPort,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            "db.namespace",
				columnName:     dbName,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:        "db.query.text",
				columnName: statementText,
				valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) {
					return obfuscateSQL(row[columnName])
				},
				valueSetter: setString,
			},
			{
				key:            "db.system.name",
				valueRetriever: defaultValueRetriever("microsoft.sql_server"),
				valueSetter:    setString,
			},
			{
				key:            "network.peer.address",
				columnName:     clientAddress,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:            "network.peer.port",
				columnName:     clientPort,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			// the following ones are the attributes that are not in the semantic conventions
			{
				key:            dbPrefix + blockingSessionID,
				columnName:     blockingSessionID,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + command,
				columnName:     command,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:            dbPrefix + contextInfo,
				valueRetriever: defaultValueRetriever(contextInfoVal),
				valueSetter:    setString,
			},
			{
				key:        dbPrefix + cpuTimeMillisecond,
				columnName: cpuTimeMillisecond,
				valueRetriever: retrieveIntAndConvert(func(i int64) any {
					return float64(i) / 1000.0
				}),
				valueSetter: setDouble,
			},
			{
				key:            dbPrefix + deadlockPriority,
				columnName:     deadlockPriority,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:        dbPrefix + estimatedCompletionTimeMillisecond,
				columnName: estimatedCompletionTimeMillisecond,
				valueRetriever: retrieveIntAndConvert(func(i int64) any {
					return float64(i) / 1000.0
				}),
				valueSetter: setDouble,
			},
			{
				key:        dbPrefix + lockTimeoutMillisecond,
				columnName: lockTimeoutMillisecond,
				valueRetriever: retrieveIntAndConvert(func(i int64) any {
					return float64(i) / 1000.0
				}),
				valueSetter: setDouble,
			},
			{
				key:            dbPrefix + logicalReads,
				columnName:     logicalReads,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + openTransactionCount,
				columnName:     openTransactionCount,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + percentComplete,
				columnName:     percentComplete,
				valueRetriever: retrieveFloat,
				valueSetter:    setDouble,
			},
			{
				key:            dbPrefix + queryHash,
				valueRetriever: defaultValueRetriever(queryHashVal),
				valueSetter:    setString,
			},
			{
				key:            dbPrefix + queryPlanHash,
				valueRetriever: defaultValueRetriever(queryPlanHashVal),
				valueSetter:    setString,
			},
			{
				key:            dbPrefix + queryStart,
				columnName:     queryStart,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:            dbPrefix + reads,
				columnName:     reads,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + requestStatus,
				columnName:     requestStatus,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:            dbPrefix + rowCount,
				columnName:     rowCount,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + sessionID,
				columnName:     sessionID,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + sessionStatus,
				columnName:     sessionStatus,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:        dbPrefix + totalElapsedTimeMillisecond,
				columnName: totalElapsedTimeMillisecond,
				valueRetriever: retrieveIntAndConvert(func(i int64) any {
					return float64(i) / 1000.0
				}),
				valueSetter: setDouble,
			},
			{
				key:            dbPrefix + transactionID,
				columnName:     transactionID,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + transactionIsolationLevel,
				columnName:     transactionIsolationLevel,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
			{
				key:            dbPrefix + username,
				columnName:     username,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:            dbPrefix + waitResource,
				columnName:     waitResource,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:        dbPrefix + waitTimeMillisecond,
				columnName: waitTimeMillisecond,
				valueRetriever: retrieveIntAndConvert(func(i int64) any {
					return float64(i) / 1000.0
				}),
				valueSetter: setDouble,
			},
			{
				key:            dbPrefix + waitType,
				columnName:     waitType,
				valueRetriever: vanillaRetriever,
				valueSetter:    setString,
			},
			{
				key:            dbPrefix + writes,
				columnName:     writes,
				valueRetriever: retrieveInt,
				valueSetter:    setInt,
			},
		}

		for _, attr := range attributes {
			value, err := attr.valueRetriever(row, attr.columnName)
			if err != nil {
				errs = append(errs, err)
				s.logger.Error(fmt.Sprintf("sqlServerScraperHelper failed parsing %s. original value: %s, err: %s", attr.columnName, row[attr.columnName], err))
			}
			attr.valueSetter(record.Attributes(), attr.key, value)
		}

		// client.address: use host_name if it has value, if not, use client_net_address.
		// this value may not be accurate if
		// - there is proxy in the middle of sql client and sql server. Or
		// - host_name value is empty or not accurate.
		if row[hostName] != "" {
			record.Attributes().PutStr("client.address", row[hostName])
		} else {
			record.Attributes().PutStr("client.address", row[clientAddress])
		}

		record.SetEventName("query sample")

		record.Body().SetStr("sample")
	}
	return logs, errors.Join(errs...)
}