in loadgen/eventhandler/handler.go [167:245]
func New(logger *zap.Logger, config Config, ec EventCollector) (*Handler, error) {
if config.Path == "" {
return nil, fmt.Errorf("eventhandler: path is required")
}
if config.Transport == nil {
return nil, errors.New("empty transport received")
}
if config.Limiter == nil {
config.Limiter = rate.NewLimiter(rate.Inf, 0)
}
if config.Rand == nil {
var rngseed int64
err := binary.Read(cryptorand.Reader, binary.LittleEndian, &rngseed)
if err != nil {
return nil, fmt.Errorf("failed to generate seed for math/rand: %w", err)
}
config.Rand = rand.New(rand.NewSource(rngseed))
}
if config.Writer == nil {
return nil, errors.New("empty writer received")
}
h := Handler{
logger: logger.Named("handler"),
config: config,
}
matches, err := fs.Glob(config.Storage, config.Path)
if err != nil {
return nil, err
}
logger.Debug("file matching pattern found for batch extraction", zap.String("matches", strings.Join(matches, ",")))
for _, path := range matches {
f, err := config.Storage.Open(path)
if err != nil {
return nil, err
}
s := bufio.NewScanner(f)
var current *batch
for s.Scan() {
line := s.Bytes()
if len(line) == 0 {
continue
}
// TODO(marclop): Suppport RUM headers and handle them differently.
if err := ec.Filter(line); err != nil {
return nil, fmt.Errorf("line filter failed: %w", err)
}
// Copy the line, as it will be overwritten by the next scan.
linecopy := make([]byte, len(line))
copy(linecopy, line)
// if the line is meta, create a new batch
if ec.IsMeta(line) {
h.batches = append(h.batches, batch{
metadata: linecopy,
})
current = &h.batches[len(h.batches)-1]
} else {
event := ec.Process(linecopy)
if !event.timestamp.IsZero() {
if h.minTimestamp.IsZero() || event.timestamp.Before(h.minTimestamp) {
h.minTimestamp = event.timestamp
}
}
current.events = append(current.events, event)
}
}
}
if len(h.batches) == 0 {
return nil, fmt.Errorf("eventhandler: found no elements to replay from %s", config.Path)
}
logger.Debug("collected batches", zap.Int("size", len(h.batches)))
return &h, nil
}