in receiver/sqlserverreceiver/scraper.go [870:1161]
func (s *sqlServerScraperHelper) recordDatabaseSampleQuery(ctx context.Context) (plog.Logs, error) {
const blockingSessionID = "blocking_session_id"
const clientAddress = "client_address"
const clientPort = "client_port"
const command = "command"
const contextInfo = "context_info"
const cpuTimeMillisecond = "cpu_time"
const dbName = "db_name"
const dbPrefix = "sqlserver."
const deadlockPriority = "deadlock_priority"
const estimatedCompletionTimeMillisecond = "estimated_completion_time"
const hostName = "host_name"
const lockTimeoutMillisecond = "lock_timeout"
const logicalReads = "logical_reads"
const openTransactionCount = "open_transaction_count"
const percentComplete = "percent_complete"
const queryHash = "query_hash"
const queryPlanHash = "query_plan_hash"
const queryStart = "query_start"
const reads = "reads"
const requestStatus = "request_status"
const rowCount = "row_count"
const sessionID = "session_id"
const sessionStatus = "session_status"
const statementText = "statement_text"
const totalElapsedTimeMillisecond = "total_elapsed_time"
const transactionID = "transaction_id"
const transactionIsolationLevel = "transaction_isolation_level"
const username = "username"
const waitResource = "wait_resource"
const waitTimeMillisecond = "wait_time"
const waitType = "wait_type"
const writes = "writes"
rows, err := s.client.QueryRows(
ctx,
sql.Named("top", s.config.TopQueryCount),
)
if err != nil {
if !errors.Is(err, sqlquery.ErrNullValueWarning) {
return plog.Logs{}, fmt.Errorf("sqlServerScraperHelper failed getting log rows: %w", err)
}
// in case the sql returned rows contains null value, we just log a warning and continue
s.logger.Warn("problems encountered getting log rows", zap.Error(err))
}
var errs []error
logs := plog.NewLogs()
resourceLog := logs.ResourceLogs().AppendEmpty()
scopedLog := resourceLog.ScopeLogs().AppendEmpty()
scopedLog.Scope().SetName(metadata.ScopeName)
scopedLog.Scope().SetVersion("v0.0.1")
for _, row := range rows {
queryHashVal := hex.EncodeToString([]byte(row[queryHash]))
queryPlanHashVal := hex.EncodeToString([]byte(row[queryPlanHash]))
contextInfoVal := hex.EncodeToString([]byte(row[contextInfo]))
record := scopedLog.LogRecords().AppendEmpty()
record.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
// Attributes sorted alphabetically by key
attributes := []internalAttribute{
{
key: "client.port",
columnName: clientPort,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: "db.namespace",
columnName: dbName,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: "db.query.text",
columnName: statementText,
valueRetriever: func(row sqlquery.StringMap, columnName string) (any, error) {
return obfuscateSQL(row[columnName])
},
valueSetter: setString,
},
{
key: "db.system.name",
valueRetriever: defaultValueRetriever("microsoft.sql_server"),
valueSetter: setString,
},
{
key: "network.peer.address",
columnName: clientAddress,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: "network.peer.port",
columnName: clientPort,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
// the following ones are the attributes that are not in the semantic conventions
{
key: dbPrefix + blockingSessionID,
columnName: blockingSessionID,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + command,
columnName: command,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: dbPrefix + contextInfo,
valueRetriever: defaultValueRetriever(contextInfoVal),
valueSetter: setString,
},
{
key: dbPrefix + cpuTimeMillisecond,
columnName: cpuTimeMillisecond,
valueRetriever: retrieveIntAndConvert(func(i int64) any {
return float64(i) / 1000.0
}),
valueSetter: setDouble,
},
{
key: dbPrefix + deadlockPriority,
columnName: deadlockPriority,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + estimatedCompletionTimeMillisecond,
columnName: estimatedCompletionTimeMillisecond,
valueRetriever: retrieveIntAndConvert(func(i int64) any {
return float64(i) / 1000.0
}),
valueSetter: setDouble,
},
{
key: dbPrefix + lockTimeoutMillisecond,
columnName: lockTimeoutMillisecond,
valueRetriever: retrieveIntAndConvert(func(i int64) any {
return float64(i) / 1000.0
}),
valueSetter: setDouble,
},
{
key: dbPrefix + logicalReads,
columnName: logicalReads,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + openTransactionCount,
columnName: openTransactionCount,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + percentComplete,
columnName: percentComplete,
valueRetriever: retrieveFloat,
valueSetter: setDouble,
},
{
key: dbPrefix + queryHash,
valueRetriever: defaultValueRetriever(queryHashVal),
valueSetter: setString,
},
{
key: dbPrefix + queryPlanHash,
valueRetriever: defaultValueRetriever(queryPlanHashVal),
valueSetter: setString,
},
{
key: dbPrefix + queryStart,
columnName: queryStart,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: dbPrefix + reads,
columnName: reads,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + requestStatus,
columnName: requestStatus,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: dbPrefix + rowCount,
columnName: rowCount,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + sessionID,
columnName: sessionID,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + sessionStatus,
columnName: sessionStatus,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: dbPrefix + totalElapsedTimeMillisecond,
columnName: totalElapsedTimeMillisecond,
valueRetriever: retrieveIntAndConvert(func(i int64) any {
return float64(i) / 1000.0
}),
valueSetter: setDouble,
},
{
key: dbPrefix + transactionID,
columnName: transactionID,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + transactionIsolationLevel,
columnName: transactionIsolationLevel,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
{
key: dbPrefix + username,
columnName: username,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: dbPrefix + waitResource,
columnName: waitResource,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: dbPrefix + waitTimeMillisecond,
columnName: waitTimeMillisecond,
valueRetriever: retrieveIntAndConvert(func(i int64) any {
return float64(i) / 1000.0
}),
valueSetter: setDouble,
},
{
key: dbPrefix + waitType,
columnName: waitType,
valueRetriever: vanillaRetriever,
valueSetter: setString,
},
{
key: dbPrefix + writes,
columnName: writes,
valueRetriever: retrieveInt,
valueSetter: setInt,
},
}
for _, attr := range attributes {
value, err := attr.valueRetriever(row, attr.columnName)
if err != nil {
errs = append(errs, err)
s.logger.Error(fmt.Sprintf("sqlServerScraperHelper failed parsing %s. original value: %s, err: %s", attr.columnName, row[attr.columnName], err))
}
attr.valueSetter(record.Attributes(), attr.key, value)
}
// client.address: use host_name if it has value, if not, use client_net_address.
// this value may not be accurate if
// - there is proxy in the middle of sql client and sql server. Or
// - host_name value is empty or not accurate.
if row[hostName] != "" {
record.Attributes().PutStr("client.address", row[hostName])
} else {
record.Attributes().PutStr("client.address", row[clientAddress])
}
record.SetEventName("query sample")
record.Body().SetStr("sample")
}
return logs, errors.Join(errs...)
}