pkg/source/gcp/task/gke/autoscaler/parser.go (186 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package autoscaler
import (
"context"
"fmt"
"strings"
"github.com/GoogleCloudPlatform/khi/pkg/common"
"github.com/GoogleCloudPlatform/khi/pkg/log"
"github.com/GoogleCloudPlatform/khi/pkg/model/enum"
"github.com/GoogleCloudPlatform/khi/pkg/model/history"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/grouper"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/resourcepath"
"github.com/GoogleCloudPlatform/khi/pkg/parser"
"github.com/GoogleCloudPlatform/khi/pkg/source/gcp/inspectiontype"
gcp_task "github.com/GoogleCloudPlatform/khi/pkg/source/gcp/task"
gke_autoscaler_taskid "github.com/GoogleCloudPlatform/khi/pkg/source/gcp/task/gke/autoscaler/taskid"
"github.com/GoogleCloudPlatform/khi/pkg/task"
"github.com/GoogleCloudPlatform/khi/pkg/task/taskid"
)
type autoscalerLogParser struct {
}
// TargetLogType implements parser.Parser.
func (p *autoscalerLogParser) TargetLogType() enum.LogType {
return enum.LogTypeAutoscaler
}
// Dependencies implements parser.Parser.
func (*autoscalerLogParser) Dependencies() []taskid.UntypedTaskReference {
return []taskid.UntypedTaskReference{
gcp_task.InputClusterNameTaskID.Ref(),
}
}
// Description implements parser.Parser.
func (*autoscalerLogParser) Description() string {
return `Gather logs related to cluster autoscaler behavior to show them on the timelines of resources related to the autoscaler decision.`
}
// GetParserName implements parser.Parser.
func (*autoscalerLogParser) GetParserName() string {
return `Autoscaler Logs`
}
// LogTask implements parser.Parser.
func (*autoscalerLogParser) LogTask() taskid.TaskReference[[]*log.LogEntity] {
return gke_autoscaler_taskid.AutoscalerQueryTaskID.Ref()
}
func (*autoscalerLogParser) Grouper() grouper.LogGrouper {
return grouper.AllDependentLogGrouper
}
// Parse implements parser.Parser.
func (p *autoscalerLogParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error {
clusterName := task.GetTaskResult(ctx, gcp_task.InputClusterNameTaskID.Ref())
// scaleUp,scaleDown,nodePoolCreated,nodePoolDeleted
if l.Has("jsonPayload.decision") {
err := parseDecision(ctx, clusterName, l, cs, builder)
if err != nil {
yaml, err := l.Fields.ToYaml("")
if err != nil {
yaml = "ERROR!! Failed to dump in YAML"
}
return fmt.Errorf("Failed to parse decision log:\nERROR:%s\n\n:SOURCE LOG:\n%s", err, yaml)
}
}
if l.Has("jsonPayload.noDecisionStatus") {
err := parseNoDecision(ctx, clusterName, l, cs, builder)
if err != nil {
return err
}
}
if l.Has("jsonPayload.resultInfo") {
err := parseResultInfo(ctx, clusterName, l, cs, builder)
if err != nil {
return err
}
}
cs.RecordEvent(resourcepath.Autoscaler(clusterName))
return nil
}
func parseDecision(ctx context.Context, clusterName string, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error {
jsonDecisionReader, err := l.Fields.ReaderSingle("jsonPayload.decision")
if err != nil {
return err
}
decision, err := parseDecisionFromReader(jsonDecisionReader)
if err != nil {
return err
}
// Parse scale up event
if decision.ScaleUp != nil {
scaleUp := decision.ScaleUp
nodepoolNames := []string{}
requestedSum := 0
for _, mig := range scaleUp.IncreasedMigs {
migResourcePath := resourcepath.Mig(clusterName, mig.Mig.Nodepool, mig.Mig.Name)
cs.RecordEvent(migResourcePath)
nodepoolNames = append(nodepoolNames, mig.Mig.Nodepool)
requestedSum += mig.RequestedNodes
}
for _, pod := range scaleUp.TriggeringPods {
cs.RecordEvent(resourcepath.Pod(pod.Namespace, pod.Name))
}
cs.RecordLogSummary(fmt.Sprintf("Scaling up nodepools by autoscaler: %s (requested: %d in total)", strings.Join(common.DedupStringArray(nodepoolNames), ","), requestedSum))
}
// Parse scale down event
if decision.ScaleDown != nil {
scaleDown := decision.ScaleDown
nodepoolNames := []string{}
for _, nodeToBeRemoved := range scaleDown.NodesToBeRemoved {
migResourcePath := resourcepath.Mig(clusterName, nodeToBeRemoved.Node.Mig.Nodepool, nodeToBeRemoved.Node.Name)
cs.RecordEvent(resourcepath.Node(nodeToBeRemoved.Node.Name))
cs.RecordEvent(migResourcePath)
for _, pod := range nodeToBeRemoved.EvictedPods {
cs.RecordEvent(resourcepath.Pod(pod.Namespace, pod.Name))
}
nodepoolNames = append(nodepoolNames, nodeToBeRemoved.Node.Mig.Nodepool)
}
cs.RecordLogSummary(fmt.Sprintf("Scaling down nodepools by autoscaler: %s (Removing %d nodes in total)", strings.Join(common.DedupStringArray(nodepoolNames), ","), len(scaleDown.NodesToBeRemoved)))
}
// Nodepool creation event
if decision.NodePoolCreated != nil {
nodePoolCreated := decision.NodePoolCreated
nodepools := []string{}
for _, nodepool := range nodePoolCreated.NodePools {
cs.RecordEvent(resourcepath.Nodepool(clusterName, nodepool.Name))
for _, mig := range nodepool.Migs {
migResourcePath := resourcepath.Mig(clusterName, mig.Nodepool, mig.Name)
cs.RecordEvent(migResourcePath)
}
nodepools = append(nodepools, nodepool.Name)
}
cs.RecordLogSummary(fmt.Sprintf("Nodepool created by node auto provisioner: %s", strings.Join(nodepools, ",")))
}
if decision.NodePoolDeleted != nil {
nodepoolDeleted := decision.NodePoolDeleted
for _, nodepool := range nodepoolDeleted.NodePoolNames {
cs.RecordEvent(resourcepath.Nodepool(clusterName, nodepool))
}
cs.RecordLogSummary(fmt.Sprintf("Nodepool deleted by node auto provisioner: %s", strings.Join(nodepoolDeleted.NodePoolNames, ",")))
}
cs.RecordLogSeverity(enum.SeverityWarning)
return nil
}
func parseNoDecision(ctx context.Context, clusterName string, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error {
jsonNoDecisionReader, err := l.Fields.ReaderSingle("jsonPayload.noDecisionStatus")
if err != nil {
return err
}
noDecision, err := parseNoDecisionFromReader(jsonNoDecisionReader)
if err != nil {
return err
}
if noDecision.NoScaleUp != nil {
noScaleUp := noDecision.NoScaleUp
for _, mig := range noScaleUp.SkippedMigs {
migResourcePath := resourcepath.Mig(clusterName, mig.Mig.Nodepool, mig.Mig.Name)
cs.RecordEvent(migResourcePath)
}
cs.RecordLogSummary("autoscaler decided not to scale up")
// TODO: support unhandled migs
}
if noDecision.NoScaleDown != nil {
noScaleDown := noDecision.NoScaleDown
migs := map[string]mig{}
for _, node := range noScaleDown.Nodes {
cs.RecordEvent(resourcepath.Node(node.Node.Name))
migs[node.Node.Mig.Id()] = node.Node.Mig
}
for _, mig := range migs {
migResourcePath := resourcepath.Mig(clusterName, mig.Nodepool, mig.Name)
cs.RecordEvent(migResourcePath)
}
cs.RecordLogSummary("autoscaler decided not to scale down")
}
cs.RecordLogSeverity(enum.SeverityInfo)
return nil
}
func parseResultInfo(ctx context.Context, clusterName string, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error {
jsonResultInfoReader, err := l.Fields.ReaderSingle("jsonPayload.resultInfo")
if err != nil {
return err
}
resultInfo, err := parseResultInfoFromReader(jsonResultInfoReader)
if err != nil {
return err
}
statuses := []string{}
for _, r := range resultInfo.Results {
status := r.EventID
if r.ErrorMsg != nil {
status += fmt.Sprintf("(Error:%s)", r.ErrorMsg.MessageId)
} else {
status += "(Success)"
}
statuses = append(statuses, status)
}
cs.RecordLogSeverity(enum.SeverityInfo)
cs.RecordLogSummary(fmt.Sprintf("autoscaler finished events: %s", strings.Join(statuses, ",")))
return nil
}
var _ parser.Parser = (*autoscalerLogParser)(nil)
var AutoscalerParserTask = parser.NewParserTaskFromParser(gke_autoscaler_taskid.AutoscalerParserTaskID, &autoscalerLogParser{}, true, inspectiontype.GKEBasedClusterInspectionTypes)