func()

in collector/otlp/logs_transfer.go [164:225]


func (s *LogsService) convertToLogBatch(msg *v1.ExportLogsServiceRequest, logBatch *types.LogBatch) int64 {
	if msg == nil {
		return 0
	}

	var droppedLogMissingMetadata int64 = 0
	for _, resourceLog := range msg.ResourceLogs {
		if resourceLog == nil {
			continue
		}

		resourceAttributes := make(map[string]any)
		if resourceLog.Resource != nil {
			extractKeyValues(resourceLog.Resource.Attributes, func(k string, v any) {
				resourceAttributes[k] = v
			})
		}

		for _, scope := range resourceLog.ScopeLogs {
			if scope == nil {
				continue
			}
			for _, record := range scope.LogRecords {
				if record == nil {
					continue
				}

				// Senders are required to include Kusto metadata in the attributes or body
				// Must have kusto.database and kusto.table
				dbName, tableName := otlp.KustoMetadata(record)
				if dbName == "" || tableName == "" {
					if logger.IsDebug() {
						s.logger.Warn("Missing Kusto metadata", "Payload", record.String())
					}

					droppedLogMissingMetadata++
					continue
				}

				log := types.LogPool.Get(1).(*types.Log)
				log.Reset()

				log.SetTimestamp(record.TimeUnixNano)
				log.SetObservedTimestamp(record.ObservedTimeUnixNano)
				extractKeyValues(record.Attributes, log.SetAttributeValue)
				extractBody(record.Body, log.SetBodyValue)

				for k, v := range resourceAttributes {
					log.SetResourceValue(k, v)
				}

				log.SetAttributeValue(types.AttributeDatabaseName, dbName)
				log.SetAttributeValue(types.AttributeTableName, tableName)

				metrics.LogKeys.WithLabelValues(dbName, tableName).Inc()

				logBatch.Logs = append(logBatch.Logs, log)
			}
		}
	}
	return droppedLogMissingMetadata
}