func NewParserTaskFromParser()

in pkg/parser/parser.go [64:177]


func NewParserTaskFromParser(taskId taskid.TaskImplementationID[struct{}], parser Parser, isDefaultFeature bool, availableInspectionTypes []string, labelOpts ...task.LabelOpt) task.Task[struct{}] {
	return inspection_task.NewProgressReportableInspectionTask(taskId, append(parser.Dependencies(), parser.LogTask(), inspection_task.BuilderGeneratorTaskID.Ref()), func(ctx context.Context, taskMode inspection_task_interface.InspectionTaskMode, tp *progress.TaskProgress) (struct{}, error) {
		if taskMode == inspection_task_interface.TaskModeDryRun {
			slog.DebugContext(ctx, "Skipping task because this is dry run mode")
			return struct{}{}, nil
		}
		builder := task.GetTaskResult(ctx, inspection_task.BuilderGeneratorTaskID.Ref())
		logs := task.GetTaskResult(ctx, parser.LogTask())

		preparedLogCount := atomic.Int32{}
		updator := progress.NewProgressUpdator(tp, time.Second, func(tp *progress.TaskProgress) {
			current := preparedLogCount.Load()
			tp.Percentage = float32(current) / float32(len(logs))
			tp.Message = fmt.Sprintf("%d/%d", current, len(logs))
		})
		updator.Start(ctx)
		err := builder.PrepareParseLogs(ctx, logs, func() {
			preparedLogCount.Add(1)
		})
		if err != nil {
			return struct{}{}, err
		}
		grouper := parser.Grouper()
		groups := grouper.Group(logs)
		groupNames := []string{}
		for key := range groups {
			groupNames = append(groupNames, key)
		}
		limitChannel := make(chan struct{}, PARSER_MAX_THREADS)
		logCounterChannel := make(chan struct{})
		currentGroup := 0
		wg := errgroup.Group{}
		parserStarted := false
		threadCount := 0
		doneThreadCount := atomic.Int32{}

		// gorutine to update progress of parseing logs.
		go func() {
			parsedLogCount := 0
			cancellable, cancel := context.WithCancel(ctx)
			defer cancel()
			go func() {
				lastLogCount := 0
				for {
					select {
					case <-cancellable.Done():
						cancel()
						return
					case <-time.After(time.Second):
						if !parserStarted {
							updator.Done()
							parserStarted = true
						}
						tp.Update(float32(parsedLogCount)/float32(len(logs)), fmt.Sprintf("%d lps(concurrency %d/%d)", parsedLogCount-lastLogCount, doneThreadCount.Load(), threadCount))
						lastLogCount = parsedLogCount
					}
				}
			}()
			for {
				select {
				case <-logCounterChannel:
					parsedLogCount += 1
				case <-ctx.Done():
					return
				}
			}
		}()

		for {
			if currentGroup >= len(groupNames) {
				break
			}
			select {
			case <-ctx.Done():
				close(limitChannel)
			default:
				limitChannel <- struct{}{}
				groupedLogs := groups[groupNames[currentGroup]]
				threadCount += 1
				wg.Go(func() error { // TODO: replace this with pkg/common/worker/pool
					defer errorreport.CheckAndReportPanic()
					err = builder.ParseLogsByGroups(ctx, groupedLogs, func(logIndex int, l *log.LogEntity) *history.ChangeSet {
						cs := history.NewChangeSet(l)
						err := parser.Parse(ctx, l, cs, builder)
						logCounterChannel <- struct{}{}
						if err != nil {
							yaml, err2 := l.Fields.ToYaml("")
							if err2 != nil {
								yaml = "ERROR!! failed to dump in yaml"
							}
							slog.WarnContext(ctx, fmt.Sprintf("parser end with an error\n%s", err))
							slog.DebugContext(ctx, yaml)
							return nil
						}
						return cs
					})
					<-limitChannel
					return err
				})
				currentGroup += 1
				doneThreadCount.Add(1)
			}
		}
		err = wg.Wait()
		if err != nil {
			return struct{}{}, err
		}
		close(logCounterChannel)
		return struct{}{}, nil
	},
		append([]task.LabelOpt{
			inspection_task.FeatureTaskLabel(parser.GetParserName(), parser.Description(), parser.TargetLogType(), isDefaultFeature, availableInspectionTypes...),
		}, labelOpts...)...)
}