in collector/logs/sources/kernel/kernel.go [69:138]
func (s *KernelSource) Open(ctx context.Context) error {
s.ackGenerator = noopAckGenerator
if s.cursorDir != "" {
s.ackGenerator = func(log *types.Log) func() {
cursorFileName := types.StringOrEmpty(log.GetAttributeValue(kernelCursorFilename))
cursorPositionVal, ok := log.GetAttributeValue(kernelSequenceAttr)
if !ok || cursorFileName == "" {
return noopAck
}
cursorPosition, ok := cursorPositionVal.(uint64)
if !ok {
return noopAck
}
return func() {
s.ackSequence(cursorPosition, cursorFileName)
}
}
}
for i, target := range s.targets {
target.cursorFile = SafeFilename(target.Database, target.Table)
fp := filepath.Join(s.cursorDir, target.cursorFile)
if offset, err := readOffset(fp); err == nil {
target.processed = offset
}
target.priority = stringToPriority(target.PriorityFilter)
s.targets[i] = target
}
ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
outputQueue := make(chan *types.LogBatch, 1)
batchQueue := make(chan *types.Log, 512)
// Start reading kernel logs
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.readKernelLogs(ctx, batchQueue)
}()
// Setup batching
batchConfig := engine.BatchConfig{
MaxBatchSize: 1000,
MaxBatchWait: 1 * time.Second,
InputQueue: batchQueue,
OutputQueue: outputQueue,
AckGenerator: s.ackGenerator,
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
engine.BatchLogs(ctx, batchConfig)
}()
// Create and start worker
worker := s.workerCreator(s.Name(), outputQueue)
s.wg.Add(1)
go func() {
defer s.wg.Done()
worker.Run()
}()
return nil
}