internal/deployers/eksapi/logs.go (259 lines of code) (raw):
package eksapi
import (
"context"
_ "embed"
"errors"
"fmt"
"slices"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/autoscaling"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/ssm"
ssmtypes "github.com/aws/aws-sdk-go-v2/service/ssm/types"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type logManager struct {
clients *awsClients
resourceID string
}
type deployerPhase string
const (
deployerPhaseUp = "up"
deployerPhaseDown = "down"
)
func NewLogManager(clients *awsClients, resourceID string) *logManager {
return &logManager{
clients: clients,
resourceID: resourceID,
}
}
func (m *logManager) gatherLogsFromNodes(k8sClient *k8sClient, opts *deployerOptions, phase deployerPhase) error {
if opts.LogBucket == "" {
klog.Info("--log-bucket is empty, no logs will be gathered!")
return nil
}
if k8sClient == nil {
klog.Infof("no k8s client available, no logs will be gathered!")
return nil
}
if opts.AutoMode {
return m.gatherLogsUsingNodeDiagnostic(k8sClient, opts, phase)
}
switch opts.UserDataFormat {
case "bootstrap.sh", "nodeadm", "": // if no --user-data-format was passed, we must be using managed nodes, which default to AL-based AMIs
return m.gatherLogsUsingScript(k8sClient, opts, phase)
default:
klog.Warningf("unable to gather logs for userDataFormat: %s\n", opts.UserDataFormat)
return nil
}
}
//go:embed logs_ssm_doc.json
var logCollectorScriptSsmDocumentContent string
const logCollectorSsmDocumentTimeout = 5 * time.Minute
func (m *logManager) gatherLogsUsingScript(k8sClient *k8sClient, opts *deployerOptions, phase deployerPhase) error {
klog.Info("gathering logs using script...")
nodes, err := k8sClient.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err != nil {
return err
}
var instanceIds []string
if len(nodes.Items) > 0 {
instanceIds, err = getNodeInstanceIDs(nodes.Items)
if err != nil {
return err
}
} else {
klog.Warning("no nodes found in cluster!")
// if we're using unmanaged nodes, we can track down the instances in the ASG even if they didn't join the cluster
if opts.UnmanagedNodes {
klog.Info("fetching instances from unmanaged nodegroup...")
out, err := m.clients.ASG().DescribeAutoScalingGroups(context.TODO(), &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []string{m.resourceID},
})
if err != nil {
klog.Warningf("failed to describe unmanaged nodegroup ASG: %v", err)
return nil
}
if len(out.AutoScalingGroups) != 1 {
klog.Warningf("autoscaling group not found: %s", m.resourceID)
} else {
for _, asg := range out.AutoScalingGroups {
for _, instance := range asg.Instances {
instanceIds = append(instanceIds, aws.ToString(instance.InstanceId))
}
}
}
}
}
if len(instanceIds) == 0 {
klog.Warning("no nodes to gather logs from!")
return nil
}
doc, err := m.clients.SSM().CreateDocument(context.TODO(), &ssm.CreateDocumentInput{
Content: aws.String(logCollectorScriptSsmDocumentContent),
Name: aws.String(fmt.Sprintf("%s-log-collector", m.resourceID)),
DocumentType: ssmtypes.DocumentTypeCommand,
DocumentFormat: ssmtypes.DocumentFormatJson,
})
if err != nil {
return err
}
defer func() {
m.clients.SSM().DeleteDocument(context.TODO(), &ssm.DeleteDocumentInput{
Name: doc.DocumentDescription.Name,
})
}()
command, err := m.clients.SSM().SendCommand(context.TODO(), &ssm.SendCommandInput{
DocumentName: doc.DocumentDescription.Name,
InstanceIds: instanceIds,
Parameters: map[string][]string{
"s3Destination": {fmt.Sprintf("s3://%s/node-logs/%s/%s/", opts.LogBucket, m.resourceID, phase)},
},
})
if err != nil {
return err
}
var errs []error
for _, instanceId := range instanceIds {
out, err := ssm.NewCommandExecutedWaiter(m.clients.SSM()).WaitForOutput(context.TODO(), &ssm.GetCommandInvocationInput{
CommandId: command.Command.CommandId,
InstanceId: aws.String(instanceId),
}, logCollectorSsmDocumentTimeout)
if err != nil {
errs = append(errs, err)
} else {
klog.Infof("log collection command for %s: %s", instanceId, out.Status)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
klog.Infof("gathered logs from nodes: %v", instanceIds)
return nil
}
const logCollectorNodeDiagnosticTimeout = 5 * time.Minute
func (m *logManager) gatherLogsUsingNodeDiagnostic(k8sClient *k8sClient, opts *deployerOptions, phase deployerPhase) error {
klog.Info("gathering logs using NodeDiagnostic...")
nodes, err := k8sClient.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err != nil {
return err
}
if len(nodes.Items) == 0 {
klog.Warning("no nodes to gather logs from!")
return nil
}
instanceIds, err := getNodeInstanceIDs(nodes.Items)
if err != nil {
return err
}
var errs []error
var nodeDiagnostics []unstructured.Unstructured
for _, instanceId := range instanceIds {
presignedPut, err := m.clients.S3Presign().PresignPutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(opts.LogBucket),
Key: aws.String(fmt.Sprintf("node-logs/%s/%s/%s.tar.gz", m.resourceID, phase, instanceId)),
})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create presigned PUT for %s: %v", instanceId, err))
continue
}
nodeDiagnostic := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "eks.amazonaws.com/v1alpha1",
"kind": "NodeDiagnostic",
"metadata": v1.ObjectMeta{
Name: instanceId,
},
"spec": map[string]interface{}{
"logCapture": map[string]interface{}{
"destination": presignedPut.URL,
},
},
},
}
if err := k8sClient.client.Create(context.TODO(), &nodeDiagnostic); err != nil {
errs = append(errs, err)
} else {
nodeDiagnostics = append(nodeDiagnostics, nodeDiagnostic)
}
}
outcomes, err := m.waitForNodeDiagnostics(k8sClient, nodeDiagnostics)
if err != nil {
errs = append(errs, fmt.Errorf("failed to wait for node diagnostics: %v", err))
}
for instanceId, reasons := range outcomes {
for _, reason := range reasons {
// consider SuccessWithErrors a success, this isn't high stakes
if !slices.Contains([]string{"Success", "SuccessWithErrors"}, reason) {
errs = append(errs, fmt.Errorf("node diagnostic outcome reason for %s: %s", instanceId, reason))
}
}
}
for _, nodeDiagnostic := range nodeDiagnostics {
if err := k8sClient.client.Delete(context.TODO(), &nodeDiagnostic); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
klog.Infof("gathered logs from nodes: %v", instanceIds)
return nil
}
// waitForNodeDiagnostics polls each node diagnostic until it reaches a terminal state, or the timeout is reached
// a map of node diagnostic names to their outcome reason(s) is returned if no error occurred
func (m *logManager) waitForNodeDiagnostics(k8sClient *k8sClient, nodeDiagnostics []unstructured.Unstructured) (map[string][]string, error) {
outcomes := make(map[string][]string)
err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, logCollectorNodeDiagnosticTimeout, false, func(ctx context.Context) (done bool, err error) {
for _, nodeDiagnostic := range nodeDiagnostics {
objectKey := client.ObjectKeyFromObject(&nodeDiagnostic)
if _, ok := outcomes[objectKey.Name]; ok {
// we already have an outcome for this node diagnostic
continue
}
if err := k8sClient.client.Get(ctx, objectKey, &nodeDiagnostic); err != nil {
return false, fmt.Errorf("failed to get node diagnostic: %+v: %v", objectKey, err)
}
complete, reasons := m.isNodeDiagnosticComplete(&nodeDiagnostic)
if !complete {
continue
}
outcomes[objectKey.Name] = reasons
}
if len(outcomes) == len(nodeDiagnostics) {
// we're done!
return true, nil
}
return false, nil
})
if err != nil {
return nil, err
}
return outcomes, nil
}
func (m *logManager) isNodeDiagnosticComplete(nodeDiagnostic *unstructured.Unstructured) (bool, []string) {
captureStatuses, found, err := unstructured.NestedSlice(nodeDiagnostic.Object, "status", "captureStatuses")
if err != nil {
klog.Errorf("NodeDiagnostic captureStatuses does not match expected type: %+v", nodeDiagnostic)
return false, nil
}
if !found {
return false, nil
}
var reasons []string
for _, captureStatus := range captureStatuses {
captureStatusMap, ok := captureStatus.(map[string]interface{})
if !ok {
klog.Errorf("NodeDiagnostic captureStatus does not match expected type: %+v", nodeDiagnostic)
return false, nil
}
reason, found, err := unstructured.NestedString(captureStatusMap, "state", "completed", "reason")
if err != nil {
klog.Errorf("NodeDiagnostic captureStatus.reason does not match expected type: %+v", nodeDiagnostic)
return false, nil
}
if !found {
return false, nil
}
reasons = append(reasons, reason)
}
return true, reasons
}