pkg/model/history/builder.go (334 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package history
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log/slog"
"math/rand"
"slices"
"sort"
"strings"
"sync"
"github.com/GoogleCloudPlatform/khi/pkg/common"
"github.com/GoogleCloudPlatform/khi/pkg/inspection/ioconfig"
"github.com/GoogleCloudPlatform/khi/pkg/inspection/metadata/progress"
"github.com/GoogleCloudPlatform/khi/pkg/log"
"github.com/GoogleCloudPlatform/khi/pkg/model/binarychunk"
"github.com/GoogleCloudPlatform/khi/pkg/model/enum"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/resourceinfo"
"golang.org/x/sync/errgroup"
)
type BuilderLogWalker = func(logIndex int, l *log.LogEntity) *ChangeSet
// Builder builds History from ChangeSet obtained from parsers.
type Builder struct {
history *History
historyLock sync.Mutex
binaryChunk *binarychunk.Builder
timelinemap *common.ShardingMap[*ResourceTimeline]
timelineBuilders *common.ShardingMap[*TimelineBuilder]
logIdToSerializableLog *common.ShardingMap[*SerializableLog]
historyResourceCache *common.ShardingMap[*Resource]
sorter *ResourceSorter
ClusterResource *resourceinfo.Cluster
}
func NewBuilder(ioConfig *ioconfig.IOConfig) *Builder {
return &Builder{
history: NewHistory(),
historyLock: sync.Mutex{},
binaryChunk: binarychunk.NewBuilder(binarychunk.NewFileSystemGzipCompressor(ioConfig.TemporaryFolder), ioConfig.TemporaryFolder),
timelinemap: common.NewShardingMap[*ResourceTimeline](common.NewSuffixShardingProvider(128, 4)),
timelineBuilders: common.NewShardingMap[*TimelineBuilder](common.NewSuffixShardingProvider(128, 4)),
logIdToSerializableLog: common.NewShardingMap[*SerializableLog](common.NewSuffixShardingProvider(128, 4)),
historyResourceCache: common.NewShardingMap[*Resource](common.NewSuffixShardingProvider(128, 4)),
ClusterResource: resourceinfo.NewClusterResourceInfo(),
sorter: NewResourceSorter(
&FirstRevisionTimeSortStrategy{
TargetRelationship: enum.RelationshipPodBinding,
},
&FirstRevisionTimeSortStrategy{
TargetRelationship: enum.RelationshipOwnerReference,
},
&FirstRevisionTimeSortStrategy{
TargetRelationship: enum.RelationshipOperation,
},
NewNameSortStrategy(0, []string{"@Cluster", "core/v1", "apps/v1"}),
NewNameSortStrategy(1, []string{"node", "pod", "service", "deployment", "replicaset", "daemonset", "cronjob", "job"}),
NewNameSortStrategy(2, []string{"kube-system"}),
NewNameSortStrategy(3, []string{}),
NewNameSortStrategy(4, []string{}),
&UnreachableSortStrategy{},
),
}
}
// Ensure specified resource path exists hierachicaly. Add resource history in middle or last when missing resource history was found on the path.
// (This method will do something similar to `mkdir -p`.)
func (builder *Builder) ensureResourcePath(resourcePath string) *Resource {
// Get resource from cache
resources := builder.historyResourceCache.AcquireShard(resourcePath)
if resource, found := resources[resourcePath]; found {
builder.historyResourceCache.ReleaseShard(resourcePath)
return resource
}
builder.historyResourceCache.ReleaseShard(resourcePath)
resourcePathFragment := strings.Split(resourcePath, "#")
currentResourceContainer := &builder.history.Resources
var currentResource *Resource
currentPath := ""
for _, fragment := range resourcePathFragment {
if currentPath != "" {
currentPath += "#"
}
currentPath += fragment
resources := builder.historyResourceCache.AcquireShard(currentPath)
if resource, found := resources[currentPath]; found {
builder.historyResourceCache.ReleaseShard(currentPath)
currentResource = resource
currentResourceContainer = &resource.Children
continue
} else {
nr := Resource{
ResourceName: fragment,
Timeline: "",
Children: []*Resource{},
Relationship: enum.RelationshipChild,
FullResourcePath: currentPath,
}
*currentResourceContainer = append(*currentResourceContainer, &nr)
currentResource = &nr
currentResourceContainer = &nr.Children
resources[currentPath] = currentResource
builder.historyResourceCache.ReleaseShard(currentPath)
}
}
return currentResource
}
func (builder *Builder) setLogSummary(logId string, summary string) error {
serializableLogs := builder.logIdToSerializableLog.AcquireShardReadonly(logId)
defer builder.logIdToSerializableLog.ReleaseShardReadonly(logId)
if sl, exist := serializableLogs[logId]; exist {
if sl.Summary != nil {
slog.Warn(fmt.Sprintf("log: %s has its summary already. Ignoreing", logId))
return nil
}
summaryRef, err := builder.binaryChunk.Write([]byte(summary))
if err != nil {
return err
}
sl.Summary = summaryRef
}
return fmt.Errorf("no log found %s", logId)
}
func (builder *Builder) setLogAnnotations(logId string, annotations []LogAnnotation) error {
serializableLogs := builder.logIdToSerializableLog.AcquireShardReadonly(logId)
defer builder.logIdToSerializableLog.ReleaseShardReadonly(logId)
if sl, exist := serializableLogs[logId]; exist {
slices.SortFunc(annotations, func(a LogAnnotation, b LogAnnotation) int {
return a.Priority() - b.Priority()
})
for _, annotation := range annotations {
result, err := annotation.Serialize(builder.binaryChunk)
if err != nil {
return err
}
sl.Annotations = append(sl.Annotations, result)
}
}
return fmt.Errorf("no log found %s", logId)
}
func (builder *Builder) setLogSeverity(logId string, severity enum.Severity) error {
serializableLogs := builder.logIdToSerializableLog.AcquireShardReadonly(logId)
defer builder.logIdToSerializableLog.ReleaseShardReadonly(logId)
if sl, exist := serializableLogs[logId]; exist {
sl.Severity = severity
}
return fmt.Errorf("no log found %s", logId)
}
func (builder *Builder) GetTimelineBuilder(resourcePath string) *TimelineBuilder {
resource := builder.ensureResourcePath(resourcePath)
// When specified resource has no associated timeline
if resource.Timeline == "" {
tid := builder.generateTimelineID()
timelineMap := builder.timelinemap.AcquireShard(tid)
timeline := newTimeline(tid)
resource.Timeline = tid
builder.history.Timelines = append(builder.history.Timelines, timeline)
timelineMap[tid] = timeline
builder.timelinemap.ReleaseShard(tid)
}
// When the timeline builder was already created, then return it
timelineBuilderMap := builder.timelineBuilders.AcquireShard(resource.Timeline)
defer builder.timelineBuilders.ReleaseShard(resource.Timeline)
if timelineBuilder, exist := timelineBuilderMap[resource.Timeline]; exist {
return timelineBuilder
}
// If not exist, then create it.
timelineMap := builder.timelinemap.AcquireShard(resource.Timeline)
defer builder.timelinemap.ReleaseShard(resource.Timeline)
timeline := timelineMap[resource.Timeline]
tb := newTimelineBuilder(builder, timeline)
timelineBuilderMap[resource.Timeline] = tb
return tb
}
// GetChildResources returns the list of ResourceTimeline filtered with the prefix of resource path.
func (builder *Builder) GetChildResources(parentResourcePath string) []*Resource {
currentList := builder.history.Resources
searchPaths := strings.Split(parentResourcePath, "#")
for i := 0; i < len(searchPaths); i++ {
nextFind := searchPaths[i]
found := false
for _, resource := range currentList {
if resource.ResourceName == nextFind {
currentList = resource.Children
found = true
break
}
}
if !found {
currentList = make([]*Resource, 0)
}
}
return currentList
}
// GetLog returns a copy of SerializableLog. Returns an error when the specified logId wasn't found from the list of consumed logs.
func (builder *Builder) GetLog(logId string) (*SerializableLog, error) {
serializableLogs := builder.logIdToSerializableLog.AcquireShardReadonly(logId)
defer builder.logIdToSerializableLog.ReleaseShardReadonly(logId)
if serializedLog, found := serializableLogs[logId]; found {
return serializedLog, nil
}
return nil, fmt.Errorf("log %s was not found", logId)
}
func (builder *Builder) addTimelineAlias(sourcePath string, destPath string) {
builder.GetTimelineBuilder(sourcePath) // Make sure timeline element related to the resource is already generated
copySource := builder.ensureResourcePath(sourcePath)
copyTo := builder.ensureResourcePath(destPath)
copyTo.Timeline = copySource.Timeline
}
func (builder *Builder) rewriteRelationship(path string, relationship enum.ParentRelationship) error {
resource := builder.ensureResourcePath(path)
if resource.Relationship != relationship && resource.Relationship != enum.RelationshipChild {
return fmt.Errorf("failed to rewrite the parentRelationship of %s. It was already rewritten to %d", path, resource.Relationship)
}
resource.Relationship = relationship
return nil
}
// PrepareParseLogs will prepare this builder to be ready to handle parsing logs by groups.
func (builder *Builder) PrepareParseLogs(ctx context.Context, entireLogs []*log.LogEntity, onLogPorcessed func()) error {
parallelism := 16
errGrp := errgroup.Group{}
for i := 0; i < parallelism; i++ {
shard := i
errGrp.Go(func() error {
logs := []*SerializableLog{}
for logIndex := shard; logIndex < len(entireLogs); logIndex += parallelism {
onLogPorcessed()
log := entireLogs[logIndex]
logId := log.ID()
serializableLogs := builder.logIdToSerializableLog.AcquireShard(logId)
if _, found := serializableLogs[logId]; found {
builder.logIdToSerializableLog.ReleaseShard(logId)
slog.WarnContext(ctx, fmt.Sprintf("duplicated consumed log %s", logId))
continue
}
yaml := log.LogBody()
bodyRef, err := builder.binaryChunk.Write([]byte(yaml))
if err != nil {
builder.logIdToSerializableLog.ReleaseShard(logId)
return err
}
severity, err := log.Severity()
if err != nil {
severity = enum.SeverityUnknown
}
sl := &SerializableLog{
ID: logId,
DisplayId: log.GetStringOrDefault("insertId", "unknown"),
Body: bodyRef,
Timestamp: log.Timestamp(),
Type: log.LogType,
Severity: severity,
Annotations: make([]any, 0),
}
logs = append(logs, sl)
serializableLogs[sl.ID] = sl
builder.logIdToSerializableLog.ReleaseShard(logId)
}
builder.historyLock.Lock()
defer builder.historyLock.Unlock()
builder.history.Logs = append(builder.history.Logs, logs...)
return nil
})
}
return errGrp.Wait()
}
func (builder *Builder) ParseLogsByGroups(ctx context.Context, groupedLogs []*log.LogEntity, logWalker BuilderLogWalker) error {
for i, l := range groupedLogs {
select {
case <-ctx.Done():
return context.Canceled
default:
cs := logWalker(i, l)
if cs != nil {
cp, err := cs.FlushToHistory(builder)
for _, path := range cp {
tb := builder.GetTimelineBuilder(path)
tb.Sort()
}
if err != nil {
return err
}
}
}
}
return nil
}
func (builder *Builder) sortData() error {
sortedResources, err := builder.sorter.SortAll(builder, builder.history.Resources)
if err != nil {
return err
}
builder.history.Resources = sortedResources
sort.Slice(builder.history.Logs, func(i, j int) bool {
return builder.history.Logs[i].Timestamp.Sub(builder.history.Logs[j].Timestamp) <= 0
})
return nil
}
// Finalize flushes the binary chunk data and serialized metadata to the given io.Writer. Returns the written data size in bytes and error.
func (builder *Builder) Finalize(ctx context.Context, serializedMetadata map[string]interface{}, writer io.Writer, progress *progress.TaskProgress) (int, error) {
fileSize := 0
progress.Update(0, "Sorting log entries")
progress.MarkIndeterminate()
builder.history.Metadata = serializedMetadata
err := builder.sortData()
if err != nil {
return 0, err
}
jsonString, err := json.Marshal(builder.history)
if err != nil {
return 0, err
}
jsonBytes := []byte(jsonString)
if writtenSize, err := writer.Write([]byte("KHI")); err != nil {
return 0, err
} else {
fileSize += writtenSize
}
metaFieldJsonSize := make([]byte, 4)
binary.LittleEndian.PutUint32(metaFieldJsonSize, uint32(len(jsonBytes)))
if writtenSize, err := writer.Write(metaFieldJsonSize); err != nil {
return 0, err
} else {
fileSize += writtenSize
}
if writtenSize, err := writer.Write(jsonBytes); err != nil {
return 0, err
} else {
fileSize += writtenSize
}
if writtenSize, err := builder.binaryChunk.Build(ctx, writer, progress); err != nil {
return 0, err
} else {
fileSize += writtenSize
}
return fileSize, nil
}
func (builder *Builder) generateTimelineID() string {
const idLength = 7
charset := "aAbBcCdDeEfFgGhHiIjJkKlLmMnNoOpPqQrRsStTuUvVwWxXyYzZ"
id := make([]byte, idLength)
for i := 0; i < len(id); i++ {
id[i] = charset[rand.Intn(len(charset))]
}
return string(id)
}