func New()

in loadgen/eventhandler/handler.go [167:245]


func New(logger *zap.Logger, config Config, ec EventCollector) (*Handler, error) {
	if config.Path == "" {
		return nil, fmt.Errorf("eventhandler: path is required")
	}
	if config.Transport == nil {
		return nil, errors.New("empty transport received")
	}
	if config.Limiter == nil {
		config.Limiter = rate.NewLimiter(rate.Inf, 0)
	}
	if config.Rand == nil {
		var rngseed int64
		err := binary.Read(cryptorand.Reader, binary.LittleEndian, &rngseed)
		if err != nil {
			return nil, fmt.Errorf("failed to generate seed for math/rand: %w", err)
		}
		config.Rand = rand.New(rand.NewSource(rngseed))
	}
	if config.Writer == nil {
		return nil, errors.New("empty writer received")
	}

	h := Handler{
		logger: logger.Named("handler"),
		config: config,
	}

	matches, err := fs.Glob(config.Storage, config.Path)
	if err != nil {
		return nil, err
	}

	logger.Debug("file matching pattern found for batch extraction", zap.String("matches", strings.Join(matches, ",")))
	for _, path := range matches {
		f, err := config.Storage.Open(path)
		if err != nil {
			return nil, err
		}

		s := bufio.NewScanner(f)
		var current *batch
		for s.Scan() {
			line := s.Bytes()
			if len(line) == 0 {
				continue
			}

			// TODO(marclop): Suppport RUM headers and handle them differently.
			if err := ec.Filter(line); err != nil {
				return nil, fmt.Errorf("line filter failed: %w", err)
			}

			// Copy the line, as it will be overwritten by the next scan.
			linecopy := make([]byte, len(line))
			copy(linecopy, line)
			// if the line is meta, create a new batch
			if ec.IsMeta(line) {
				h.batches = append(h.batches, batch{
					metadata: linecopy,
				})
				current = &h.batches[len(h.batches)-1]
			} else {
				event := ec.Process(linecopy)
				if !event.timestamp.IsZero() {
					if h.minTimestamp.IsZero() || event.timestamp.Before(h.minTimestamp) {
						h.minTimestamp = event.timestamp
					}
				}
				current.events = append(current.events, event)
			}
		}
	}
	if len(h.batches) == 0 {
		return nil, fmt.Errorf("eventhandler: found no elements to replay from %s", config.Path)
	}

	logger.Debug("collected batches", zap.Int("size", len(h.batches)))
	return &h, nil
}