in pkg/parser/parser.go [64:177]
func NewParserTaskFromParser(taskId taskid.TaskImplementationID[struct{}], parser Parser, isDefaultFeature bool, availableInspectionTypes []string, labelOpts ...task.LabelOpt) task.Task[struct{}] {
return inspection_task.NewProgressReportableInspectionTask(taskId, append(parser.Dependencies(), parser.LogTask(), inspection_task.BuilderGeneratorTaskID.Ref()), func(ctx context.Context, taskMode inspection_task_interface.InspectionTaskMode, tp *progress.TaskProgress) (struct{}, error) {
if taskMode == inspection_task_interface.TaskModeDryRun {
slog.DebugContext(ctx, "Skipping task because this is dry run mode")
return struct{}{}, nil
}
builder := task.GetTaskResult(ctx, inspection_task.BuilderGeneratorTaskID.Ref())
logs := task.GetTaskResult(ctx, parser.LogTask())
preparedLogCount := atomic.Int32{}
updator := progress.NewProgressUpdator(tp, time.Second, func(tp *progress.TaskProgress) {
current := preparedLogCount.Load()
tp.Percentage = float32(current) / float32(len(logs))
tp.Message = fmt.Sprintf("%d/%d", current, len(logs))
})
updator.Start(ctx)
err := builder.PrepareParseLogs(ctx, logs, func() {
preparedLogCount.Add(1)
})
if err != nil {
return struct{}{}, err
}
grouper := parser.Grouper()
groups := grouper.Group(logs)
groupNames := []string{}
for key := range groups {
groupNames = append(groupNames, key)
}
limitChannel := make(chan struct{}, PARSER_MAX_THREADS)
logCounterChannel := make(chan struct{})
currentGroup := 0
wg := errgroup.Group{}
parserStarted := false
threadCount := 0
doneThreadCount := atomic.Int32{}
// gorutine to update progress of parseing logs.
go func() {
parsedLogCount := 0
cancellable, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
lastLogCount := 0
for {
select {
case <-cancellable.Done():
cancel()
return
case <-time.After(time.Second):
if !parserStarted {
updator.Done()
parserStarted = true
}
tp.Update(float32(parsedLogCount)/float32(len(logs)), fmt.Sprintf("%d lps(concurrency %d/%d)", parsedLogCount-lastLogCount, doneThreadCount.Load(), threadCount))
lastLogCount = parsedLogCount
}
}
}()
for {
select {
case <-logCounterChannel:
parsedLogCount += 1
case <-ctx.Done():
return
}
}
}()
for {
if currentGroup >= len(groupNames) {
break
}
select {
case <-ctx.Done():
close(limitChannel)
default:
limitChannel <- struct{}{}
groupedLogs := groups[groupNames[currentGroup]]
threadCount += 1
wg.Go(func() error { // TODO: replace this with pkg/common/worker/pool
defer errorreport.CheckAndReportPanic()
err = builder.ParseLogsByGroups(ctx, groupedLogs, func(logIndex int, l *log.LogEntity) *history.ChangeSet {
cs := history.NewChangeSet(l)
err := parser.Parse(ctx, l, cs, builder)
logCounterChannel <- struct{}{}
if err != nil {
yaml, err2 := l.Fields.ToYaml("")
if err2 != nil {
yaml = "ERROR!! failed to dump in yaml"
}
slog.WarnContext(ctx, fmt.Sprintf("parser end with an error\n%s", err))
slog.DebugContext(ctx, yaml)
return nil
}
return cs
})
<-limitChannel
return err
})
currentGroup += 1
doneThreadCount.Add(1)
}
}
err = wg.Wait()
if err != nil {
return struct{}{}, err
}
close(logCounterChannel)
return struct{}{}, nil
},
append([]task.LabelOpt{
inspection_task.FeatureTaskLabel(parser.GetParserName(), parser.Description(), parser.TargetLogType(), isDefaultFeature, availableInspectionTypes...),
}, labelOpts...)...)
}