in collector/otlp/logs_transfer.go [164:225]
func (s *LogsService) convertToLogBatch(msg *v1.ExportLogsServiceRequest, logBatch *types.LogBatch) int64 {
if msg == nil {
return 0
}
var droppedLogMissingMetadata int64 = 0
for _, resourceLog := range msg.ResourceLogs {
if resourceLog == nil {
continue
}
resourceAttributes := make(map[string]any)
if resourceLog.Resource != nil {
extractKeyValues(resourceLog.Resource.Attributes, func(k string, v any) {
resourceAttributes[k] = v
})
}
for _, scope := range resourceLog.ScopeLogs {
if scope == nil {
continue
}
for _, record := range scope.LogRecords {
if record == nil {
continue
}
// Senders are required to include Kusto metadata in the attributes or body
// Must have kusto.database and kusto.table
dbName, tableName := otlp.KustoMetadata(record)
if dbName == "" || tableName == "" {
if logger.IsDebug() {
s.logger.Warn("Missing Kusto metadata", "Payload", record.String())
}
droppedLogMissingMetadata++
continue
}
log := types.LogPool.Get(1).(*types.Log)
log.Reset()
log.SetTimestamp(record.TimeUnixNano)
log.SetObservedTimestamp(record.ObservedTimeUnixNano)
extractKeyValues(record.Attributes, log.SetAttributeValue)
extractBody(record.Body, log.SetBodyValue)
for k, v := range resourceAttributes {
log.SetResourceValue(k, v)
}
log.SetAttributeValue(types.AttributeDatabaseName, dbName)
log.SetAttributeValue(types.AttributeTableName, tableName)
metrics.LogKeys.WithLabelValues(dbName, tableName).Inc()
logBatch.Logs = append(logBatch.Logs, log)
}
}
}
return droppedLogMissingMetadata
}