pkg/parser/parser.go (139 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parser
import (
"context"
"fmt"
"log/slog"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/khi/pkg/common/errorreport"
inspection_task_interface "github.com/GoogleCloudPlatform/khi/pkg/inspection/interface"
"github.com/GoogleCloudPlatform/khi/pkg/inspection/metadata/progress"
inspection_task "github.com/GoogleCloudPlatform/khi/pkg/inspection/task"
"github.com/GoogleCloudPlatform/khi/pkg/log"
"github.com/GoogleCloudPlatform/khi/pkg/model/enum"
"github.com/GoogleCloudPlatform/khi/pkg/model/history"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/grouper"
"github.com/GoogleCloudPlatform/khi/pkg/task"
"github.com/GoogleCloudPlatform/khi/pkg/task/taskid"
"golang.org/x/sync/errgroup"
)
var PARSER_MAX_THREADS = 16
// Parser is common interfaces across all of log parsers in KHI
type Parser interface {
// GetParserName Returns it's own parser name. It must be unique by each instances.
GetParserName() string
// TargetLogType returns the log type which this parser should mainly parse and generate revisions or events for.
TargetLogType() enum.LogType
// Parse a log. Return an error to decide skip to parse the log and delegate later parsers.
Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error
// Description returns comprehensive description of the parser.
// Parser tasks are registered as a `feature task` and the description is shown on the frontend.
Description() string
// LogTask returns the task Id generating []*log.LogEntity
LogTask() taskid.TaskReference[[]*log.LogEntity]
// Dependencies returns the list of task Ids excluding the log task
Dependencies() []taskid.UntypedTaskReference
// Grouper returns LogGrouper that groups logs into multiple sets. These sets are sorted individually and parsed in parallel, then merged later.
Grouper() grouper.LogGrouper
}
func NewParserTaskFromParser(taskId taskid.TaskImplementationID[struct{}], parser Parser, isDefaultFeature bool, availableInspectionTypes []string, labelOpts ...task.LabelOpt) task.Task[struct{}] {
return inspection_task.NewProgressReportableInspectionTask(taskId, append(parser.Dependencies(), parser.LogTask(), inspection_task.BuilderGeneratorTaskID.Ref()), func(ctx context.Context, taskMode inspection_task_interface.InspectionTaskMode, tp *progress.TaskProgress) (struct{}, error) {
if taskMode == inspection_task_interface.TaskModeDryRun {
slog.DebugContext(ctx, "Skipping task because this is dry run mode")
return struct{}{}, nil
}
builder := task.GetTaskResult(ctx, inspection_task.BuilderGeneratorTaskID.Ref())
logs := task.GetTaskResult(ctx, parser.LogTask())
preparedLogCount := atomic.Int32{}
updator := progress.NewProgressUpdator(tp, time.Second, func(tp *progress.TaskProgress) {
current := preparedLogCount.Load()
tp.Percentage = float32(current) / float32(len(logs))
tp.Message = fmt.Sprintf("%d/%d", current, len(logs))
})
updator.Start(ctx)
err := builder.PrepareParseLogs(ctx, logs, func() {
preparedLogCount.Add(1)
})
if err != nil {
return struct{}{}, err
}
grouper := parser.Grouper()
groups := grouper.Group(logs)
groupNames := []string{}
for key := range groups {
groupNames = append(groupNames, key)
}
limitChannel := make(chan struct{}, PARSER_MAX_THREADS)
logCounterChannel := make(chan struct{})
currentGroup := 0
wg := errgroup.Group{}
parserStarted := false
threadCount := 0
doneThreadCount := atomic.Int32{}
// gorutine to update progress of parseing logs.
go func() {
parsedLogCount := 0
cancellable, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
lastLogCount := 0
for {
select {
case <-cancellable.Done():
cancel()
return
case <-time.After(time.Second):
if !parserStarted {
updator.Done()
parserStarted = true
}
tp.Update(float32(parsedLogCount)/float32(len(logs)), fmt.Sprintf("%d lps(concurrency %d/%d)", parsedLogCount-lastLogCount, doneThreadCount.Load(), threadCount))
lastLogCount = parsedLogCount
}
}
}()
for {
select {
case <-logCounterChannel:
parsedLogCount += 1
case <-ctx.Done():
return
}
}
}()
for {
if currentGroup >= len(groupNames) {
break
}
select {
case <-ctx.Done():
close(limitChannel)
default:
limitChannel <- struct{}{}
groupedLogs := groups[groupNames[currentGroup]]
threadCount += 1
wg.Go(func() error { // TODO: replace this with pkg/common/worker/pool
defer errorreport.CheckAndReportPanic()
err = builder.ParseLogsByGroups(ctx, groupedLogs, func(logIndex int, l *log.LogEntity) *history.ChangeSet {
cs := history.NewChangeSet(l)
err := parser.Parse(ctx, l, cs, builder)
logCounterChannel <- struct{}{}
if err != nil {
yaml, err2 := l.Fields.ToYaml("")
if err2 != nil {
yaml = "ERROR!! failed to dump in yaml"
}
slog.WarnContext(ctx, fmt.Sprintf("parser end with an error\n%s", err))
slog.DebugContext(ctx, yaml)
return nil
}
return cs
})
<-limitChannel
return err
})
currentGroup += 1
doneThreadCount.Add(1)
}
}
err = wg.Wait()
if err != nil {
return struct{}{}, err
}
close(logCounterChannel)
return struct{}{}, nil
},
append([]task.LabelOpt{
inspection_task.FeatureTaskLabel(parser.GetParserName(), parser.Description(), parser.TargetLogType(), isDefaultFeature, availableInspectionTypes...),
}, labelOpts...)...)
}