func()

in collector/logs/sources/kernel/kernel.go [69:138]


func (s *KernelSource) Open(ctx context.Context) error {
	s.ackGenerator = noopAckGenerator
	if s.cursorDir != "" {
		s.ackGenerator = func(log *types.Log) func() {
			cursorFileName := types.StringOrEmpty(log.GetAttributeValue(kernelCursorFilename))
			cursorPositionVal, ok := log.GetAttributeValue(kernelSequenceAttr)

			if !ok || cursorFileName == "" {
				return noopAck
			}
			cursorPosition, ok := cursorPositionVal.(uint64)
			if !ok {
				return noopAck
			}

			return func() {
				s.ackSequence(cursorPosition, cursorFileName)
			}
		}
	}

	for i, target := range s.targets {
		target.cursorFile = SafeFilename(target.Database, target.Table)
		fp := filepath.Join(s.cursorDir, target.cursorFile)
		if offset, err := readOffset(fp); err == nil {
			target.processed = offset
		}
		target.priority = stringToPriority(target.PriorityFilter)
		s.targets[i] = target
	}

	ctx, cancel := context.WithCancel(ctx)
	s.cancel = cancel

	outputQueue := make(chan *types.LogBatch, 1)
	batchQueue := make(chan *types.Log, 512)

	// Start reading kernel logs
	s.wg.Add(1)
	go func() {
		defer s.wg.Done()
		s.readKernelLogs(ctx, batchQueue)
	}()

	// Setup batching
	batchConfig := engine.BatchConfig{
		MaxBatchSize: 1000,
		MaxBatchWait: 1 * time.Second,
		InputQueue:   batchQueue,
		OutputQueue:  outputQueue,
		AckGenerator: s.ackGenerator,
	}

	s.wg.Add(1)
	go func() {
		defer s.wg.Done()
		engine.BatchLogs(ctx, batchConfig)
	}()

	// Create and start worker
	worker := s.workerCreator(s.Name(), outputQueue)

	s.wg.Add(1)
	go func() {
		defer s.wg.Done()
		worker.Run()
	}()

	return nil
}