pkg/deploy/lattice/targets_synthesizer.go (157 lines of code) (raw):

package lattice import ( "context" "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/vpclattice" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/aws-application-networking-k8s/pkg/model/core" model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" "github.com/aws/aws-application-networking-k8s/pkg/utils" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" "github.com/aws/aws-application-networking-k8s/pkg/webhook" ) const ( LatticeReadinessGateConditionType = webhook.PodReadinessGateConditionType ReadinessReasonHealthy = "Healthy" ReadinessReasonUnhealthy = "Unhealthy" ReadinessReasonUnused = "Unused" ReadinessReasonInitial = "Initial" ReadinessReasonHealthCheckUnavailable = "HealthCheckUnavailable" ReadinessReasonTargetNotFound = "TargetNotFound" ) func NewTargetsSynthesizer( log gwlog.Logger, client client.Client, tgManager TargetsManager, stack core.Stack, ) *targetsSynthesizer { return &targetsSynthesizer{ log: log, client: client, targetsManager: tgManager, stack: stack, } } type targetsSynthesizer struct { log gwlog.Logger client client.Client targetsManager TargetsManager stack core.Stack } func (t *targetsSynthesizer) Synthesize(ctx context.Context) error { var resTargets []*model.Targets err := t.stack.ListResources(&resTargets) if err != nil { t.log.Errorf(ctx, "Failed to list targets due to %s", err) } for _, targets := range resTargets { tg := &model.TargetGroup{} err := t.stack.GetResource(targets.Spec.StackTargetGroupId, tg) if err != nil { return err } err = t.targetsManager.Update(ctx, targets, tg) if err != nil { identifier := model.TgNamePrefix(tg.Spec) if tg.Status != nil && tg.Status.Id != "" { identifier = tg.Status.Id } return fmt.Errorf("failed to synthesize targets %s due to %s", identifier, err) } } return nil } func (t *targetsSynthesizer) PostSynthesize(ctx context.Context) error { var resTargets []*model.Targets err := t.stack.ListResources(&resTargets) if err != nil { t.log.Errorf(ctx, "Failed to list targets due to %s", err) } requeueNeeded := false for _, targets := range resTargets { tg := &model.TargetGroup{} err := t.stack.GetResource(targets.Spec.StackTargetGroupId, tg) if err != nil { return err } identifier := model.TgNamePrefix(tg.Spec) if tg.Status != nil && tg.Status.Id != "" { identifier = tg.Status.Id } latticeTargets, err := t.targetsManager.List(ctx, tg) if err != nil { return fmt.Errorf("failed post-synthesize targets %s, ListTargets failure: %w", identifier, err) } pending, err := t.syncStatus(ctx, targets.Spec.TargetList, latticeTargets) if err != nil { return fmt.Errorf("failed post-synthesize targets %s, condition sync failure: %w", identifier, err) } requeueNeeded = requeueNeeded || pending } if requeueNeeded { return fmt.Errorf("%w: target status still in pending", RetryErr) } return nil } func (t *targetsSynthesizer) syncStatus(ctx context.Context, modelTargets []model.Target, latticeTargets []*vpclattice.TargetSummary) (bool, error) { // Extract Lattice targets as a set latticeTargetMap := make(map[model.Target]*vpclattice.TargetSummary) for _, latticeTarget := range latticeTargets { ipPort := model.Target{ TargetIP: aws.StringValue(latticeTarget.Id), Port: aws.Int64Value(latticeTarget.Port), } latticeTargetMap[ipPort] = latticeTarget } var requeue bool for _, target := range modelTargets { // Step 0: Check if the endpoint has a valid target, and is not ready yet. if target.Ready || target.TargetRef.Name == "" { continue } // Step 1: Check if the pod has the readiness gate spec. pod := &corev1.Pod{} t.client.Get(ctx, target.TargetRef, pod) if !utils.PodHasReadinessGate(pod, LatticeReadinessGateConditionType) { continue } // Step 2: Check if the pod readiness condition exists with specific condition type. // The condition is considered false when it does not exist. cond := utils.FindPodStatusCondition(pod.Status.Conditions, LatticeReadinessGateConditionType) if cond != nil && cond.Status == corev1.ConditionTrue { continue } // Step 3: Check if the Lattice target is healthy. newCond := corev1.PodCondition{ Type: LatticeReadinessGateConditionType, Status: corev1.ConditionFalse, } targetIpPort := model.Target{ TargetIP: target.TargetIP, Port: target.Port, } // syncStatus is called at post synthesis, so we can assume: // 1. Target for the pod (eventually) exists. If the target doesn't exist, we can simply requeue. // 2. Target group will be always in use, except for ServiceExport TGs. if latticeTarget, ok := latticeTargetMap[targetIpPort]; ok { switch status := aws.StringValue(latticeTarget.Status); status { case vpclattice.TargetStatusHealthy: newCond.Status = corev1.ConditionTrue newCond.Reason = ReadinessReasonHealthy case vpclattice.TargetStatusUnavailable: // Lattice HC not turned on. Readiness is designed to work only with HC but do not block deployment on this case. newCond.Status = corev1.ConditionTrue newCond.Reason = ReadinessReasonHealthCheckUnavailable case vpclattice.TargetStatusUnused: // Since this logic is called after HTTPRoute is wired, this only happens for ServiceExport TGs. // In this case we do not have to evaluate them as Healthy, but we also do not have to requeue. newCond.Reason = ReadinessReasonUnused case vpclattice.TargetStatusInitial: requeue = true newCond.Reason = ReadinessReasonInitial default: requeue = true newCond.Reason = ReadinessReasonUnhealthy newCond.Message = fmt.Sprintf("Target health check status: %s", status) } } else { requeue = true newCond.Reason = ReadinessReasonTargetNotFound } // Step 4: Update status. utils.SetPodStatusCondition(&pod.Status.Conditions, newCond) if err := t.client.Status().Update(ctx, pod); err != nil { return requeue, err } } return requeue, nil }