collector/logs/sources/kernel/kernel.go (209 lines of code) (raw):

package kernel import ( "context" "crypto/sha256" "encoding/binary" "fmt" "os" "path/filepath" "strings" "sync" "time" "github.com/Azure/adx-mon/collector/logs/engine" "github.com/Azure/adx-mon/collector/logs/types" "github.com/Azure/adx-mon/pkg/logger" "github.com/siderolabs/go-kmsg" ) // Constants for attribute keys const ( kernelSequenceAttr = "adxmon_kernel_sequence" kernelCursorFilename = "adxmon_kernel_cursor_filename" ) // KernelSourceConfig configures KernelSource. type KernelSourceConfig struct { WorkerCreator engine.WorkerCreatorFunc CursorDirectory string Targets []KernelTargetConfig } type KernelTargetConfig struct { Database string Table string PriorityFilter string // processed is the last processed sequence number as // stored in the cursor file if one exists. processed uint64 // cursorFile is the name of the cursor file cursorFile string // priority is the kmsg priority of the log, which speeds up comparison priority kmsg.Priority } // KernelSource implements the types.Source interface for reading kernel logs. type KernelSource struct { workerCreator engine.WorkerCreatorFunc ackGenerator func(*types.Log) func() targets []KernelTargetConfig cursorDir string cancel context.CancelFunc wg sync.WaitGroup // Added WaitGroup for goroutine lifecycle management reader kmsg.Reader // Add reader field for testing } // NewKernelSource creates a new KernelSource. func NewKernelSource(config KernelSourceConfig) (*KernelSource, error) { return &KernelSource{ workerCreator: config.WorkerCreator, targets: config.Targets, cursorDir: config.CursorDirectory, }, nil } // Open starts reading kernel logs. func (s *KernelSource) Open(ctx context.Context) error { s.ackGenerator = noopAckGenerator if s.cursorDir != "" { s.ackGenerator = func(log *types.Log) func() { cursorFileName := types.StringOrEmpty(log.GetAttributeValue(kernelCursorFilename)) cursorPositionVal, ok := log.GetAttributeValue(kernelSequenceAttr) if !ok || cursorFileName == "" { return noopAck } cursorPosition, ok := cursorPositionVal.(uint64) if !ok { return noopAck } return func() { s.ackSequence(cursorPosition, cursorFileName) } } } for i, target := range s.targets { target.cursorFile = SafeFilename(target.Database, target.Table) fp := filepath.Join(s.cursorDir, target.cursorFile) if offset, err := readOffset(fp); err == nil { target.processed = offset } target.priority = stringToPriority(target.PriorityFilter) s.targets[i] = target } ctx, cancel := context.WithCancel(ctx) s.cancel = cancel outputQueue := make(chan *types.LogBatch, 1) batchQueue := make(chan *types.Log, 512) // Start reading kernel logs s.wg.Add(1) go func() { defer s.wg.Done() s.readKernelLogs(ctx, batchQueue) }() // Setup batching batchConfig := engine.BatchConfig{ MaxBatchSize: 1000, MaxBatchWait: 1 * time.Second, InputQueue: batchQueue, OutputQueue: outputQueue, AckGenerator: s.ackGenerator, } s.wg.Add(1) go func() { defer s.wg.Done() engine.BatchLogs(ctx, batchConfig) }() // Create and start worker worker := s.workerCreator(s.Name(), outputQueue) s.wg.Add(1) go func() { defer s.wg.Done() worker.Run() }() return nil } // Close stops reading kernel logs and waits for goroutines to finish. func (s *KernelSource) Close() error { s.cancel() // Wait for all goroutines to finish s.wg.Wait() return nil } // Name returns the name of the source. func (s *KernelSource) Name() string { return "kernelsource" } // ackSequence writes the sequence number to the offset file func (s *KernelSource) ackSequence(sequence uint64, cursorFile string) { if err := writeOffset(filepath.Join(s.cursorDir, cursorFile), sequence); err != nil { logger.Errorf("failed to write kernel offset: %v", err) } } // readKernelLogs reads logs from the kernel message buffer. func (s *KernelSource) readKernelLogs(ctx context.Context, outputQueue chan<- *types.Log) { var reader kmsg.Reader var err error if s.reader != nil { // Use injected reader for testing reader = s.reader } else { // Create real reader reader, err = kmsg.NewReader(kmsg.Follow()) if err != nil { logger.Errorf("failed to create kmsg reader: %v", err) return } } defer reader.Close() // Read kernel log entries. // There is only one kernel log source stream but n targets. for pkt := range reader.Scan(ctx) { if pkt.Err != nil { logger.Errorf("failed to read kernel log: %v", pkt.Err) continue } entrySeq := uint64(pkt.Message.SequenceNumber) for _, target := range s.targets { if target.processed != 0 && entrySeq <= target.processed { continue // skip already processed logs } if target.priority < pkt.Message.Priority { continue // skip logs with lower priority } log := types.LogPool.Get(1).(*types.Log) log.Reset() log.SetTimestamp(uint64(pkt.Message.Timestamp.UnixNano())) log.SetObservedTimestamp(uint64(time.Now().UnixNano())) log.SetBodyValue(types.BodyKeyMessage, pkt.Message.Message) log.SetAttributeValue("priority", pkt.Message.Priority) log.SetAttributeValue(types.AttributeDatabaseName, target.Database) log.SetAttributeValue(types.AttributeTableName, target.Table) log.SetAttributeValue(kernelSequenceAttr, entrySeq) log.SetAttributeValue(kernelCursorFilename, target.cursorFile) outputQueue <- log } } } // SafeFilename creates a Linux filesystem-safe filename from database and table names. // It prefixes the name with "kernel_" and uses a short hash to avoid length issues. func SafeFilename(database, table string) string { // SHA-256 hash (shorter, not reversible) combined := database + "_" + table hash := sha256.Sum256([]byte(combined)) // Use first 8 bytes of hash for a reasonable length filename hashStr := fmt.Sprintf("%x", hash[:8]) return fmt.Sprintf("kernel_%s", hashStr) } func readOffset(path string) (uint64, error) { data, err := os.ReadFile(path) if err != nil { return 0, err } // File doesn't contain enough bytes if len(data) < 8 { return 0, fmt.Errorf("invalid offset file format: insufficient data") } // Read uint64 directly from bytes (little-endian) return binary.LittleEndian.Uint64(data[:8]), nil } func writeOffset(path string, sequence uint64) error { // Write directly as binary data := make([]byte, 8) binary.LittleEndian.PutUint64(data, sequence) return os.WriteFile(path, data, 0644) } // stringToPriority converts a priority string to kmsg.Priority func stringToPriority(priorityStr string) kmsg.Priority { // Default to Info if empty if priorityStr == "" { return kmsg.Info } // Convert string to Priority enum switch strings.ToLower(priorityStr) { case "emerg": return kmsg.Emerg case "alert": return kmsg.Alert case "crit": return kmsg.Crit case "err", "error": return kmsg.Err case "warning", "warn": return kmsg.Warning case "notice": return kmsg.Notice case "info": return kmsg.Info case "debug": return kmsg.Debug default: // If invalid, default to Info logger.Warnf("Invalid priority filter: %s, defaulting to 'info'", priorityStr) return kmsg.Info } } var ( noopAck = func() {} noopAckGenerator = func(*types.Log) func() { return noopAck } )