pkg/deploy/lattice/targets_manager.go (141 lines of code) (raw):
package lattice
import (
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/service/vpclattice"
pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
"github.com/aws/aws-application-networking-k8s/pkg/utils"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"github.com/aws/aws-sdk-go/aws"
)
const (
// Maximum allowed number of targets per each VPC Lattice RegisterTargets/DeregisterTargets API call
// https://docs.aws.amazon.com/vpc-lattice/latest/APIReference/API_RegisterTargets.html
maxTargetsPerLatticeTargetsApiCall = 100
)
//go:generate mockgen -destination targets_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice TargetsManager
type TargetsManager interface {
List(ctx context.Context, modelTg *model.TargetGroup) ([]*vpclattice.TargetSummary, error)
Update(ctx context.Context, modelTargets *model.Targets, modelTg *model.TargetGroup) error
}
type defaultTargetsManager struct {
log gwlog.Logger
cloud pkg_aws.Cloud
}
func NewTargetsManager(
log gwlog.Logger,
cloud pkg_aws.Cloud,
) *defaultTargetsManager {
return &defaultTargetsManager{
log: log,
cloud: cloud,
}
}
func (s *defaultTargetsManager) List(ctx context.Context, modelTg *model.TargetGroup) ([]*vpclattice.TargetSummary, error) {
lattice := s.cloud.Lattice()
listTargetsInput := vpclattice.ListTargetsInput{
TargetGroupIdentifier: &modelTg.Status.Id,
}
return lattice.ListTargetsAsList(ctx, &listTargetsInput)
}
func (s *defaultTargetsManager) Update(ctx context.Context, modelTargets *model.Targets, modelTg *model.TargetGroup) error {
if modelTg.Status == nil || modelTg.Status.Id == "" {
return errors.New("model target group is missing id")
}
if modelTargets.Spec.StackTargetGroupId != modelTg.ID() {
return fmt.Errorf("target group ID %s does not match target reference ID %s",
modelTg.ID(), modelTargets.Spec.StackTargetGroupId)
}
s.log.Debugf(ctx, "Creating targets for target group %s", modelTg.Status.Id)
latticeTargets, err := s.List(ctx, modelTg)
if err != nil {
return err
}
staleTargets := s.findStaleTargets(modelTargets, latticeTargets)
err1 := s.deregisterTargets(ctx, modelTg, staleTargets)
err2 := s.registerTargets(ctx, modelTg, modelTargets.Spec.TargetList)
return errors.Join(err1, err2)
}
func (s *defaultTargetsManager) findStaleTargets(
modelTargets *model.Targets,
listTargetsOutput []*vpclattice.TargetSummary) []model.Target {
// Disregard readiness information, and use IP/Port as key.
modelSet := utils.NewSet[model.Target]()
for _, target := range modelTargets.Spec.TargetList {
targetIpPort := model.Target{
TargetIP: target.TargetIP,
Port: target.Port,
}
modelSet.Put(targetIpPort)
}
staleTargets := make([]model.Target, 0)
for _, target := range listTargetsOutput {
ipPort := model.Target{
TargetIP: aws.StringValue(target.Id),
Port: aws.Int64Value(target.Port),
}
if aws.StringValue(target.Status) != vpclattice.TargetStatusDraining && !modelSet.Contains(ipPort) {
staleTargets = append(staleTargets, ipPort)
}
}
return staleTargets
}
func (s *defaultTargetsManager) registerTargets(
ctx context.Context,
modelTg *model.TargetGroup,
targets []model.Target,
) error {
if len(targets) == 0 {
return nil
}
latticeTargets := utils.SliceMap(targets, func(t model.Target) *vpclattice.Target {
return &vpclattice.Target{Id: &t.TargetIP, Port: &t.Port}
})
chunks := utils.Chunks(latticeTargets, maxTargetsPerLatticeTargetsApiCall)
var registerTargetsError error
for i, chunk := range chunks {
registerTargetsInput := vpclattice.RegisterTargetsInput{
TargetGroupIdentifier: &modelTg.Status.Id,
Targets: chunk,
}
resp, err := s.cloud.Lattice().RegisterTargetsWithContext(ctx, ®isterTargetsInput)
if err != nil {
registerTargetsError = errors.Join(registerTargetsError, fmt.Errorf("failed to register targets from VPC Lattice Target Group %s due to %s", modelTg.Status.Id, err))
}
if len(resp.Unsuccessful) > 0 {
registerTargetsError = errors.Join(registerTargetsError, fmt.Errorf("failed to register targets from VPC Lattice Target Group %s for chunk %d/%d, unsuccessful targets %v",
modelTg.Status.Id, i+1, len(chunks), resp.Unsuccessful))
}
s.log.Debugf(ctx, "Successfully registered %d targets from VPC Lattice Target Group %s for chunk %d/%d",
len(resp.Successful), modelTg.Status.Id, i+1, len(chunks))
}
return registerTargetsError
}
func (s *defaultTargetsManager) deregisterTargets(
ctx context.Context,
modelTg *model.TargetGroup,
targets []model.Target,
) error {
if len(targets) == 0 {
return nil
}
latticeTargets := utils.SliceMap(targets, func(t model.Target) *vpclattice.Target {
return &vpclattice.Target{Id: &t.TargetIP, Port: &t.Port}
})
chunks := utils.Chunks(latticeTargets, maxTargetsPerLatticeTargetsApiCall)
var deregisterTargetsError error
for i, chunk := range chunks {
deregisterTargetsInput := vpclattice.DeregisterTargetsInput{
TargetGroupIdentifier: &modelTg.Status.Id,
Targets: chunk,
}
resp, err := s.cloud.Lattice().DeregisterTargetsWithContext(ctx, &deregisterTargetsInput)
if err != nil {
deregisterTargetsError = errors.Join(deregisterTargetsError, fmt.Errorf("failed to deregister targets from VPC Lattice Target Group %s due to %s", modelTg.Status.Id, err))
}
if len(resp.Unsuccessful) > 0 {
deregisterTargetsError = errors.Join(deregisterTargetsError, fmt.Errorf("failed to deregister targets from VPC Lattice Target Group %s for chunk %d/%d, unsuccessful targets %v",
modelTg.Status.Id, i+1, len(chunks), resp.Unsuccessful))
}
s.log.Debugf(ctx, "Successfully deregistered %d targets from VPC Lattice Target Group %s for chunk %d/%d", len(resp.Successful), modelTg.Status.Id, i+1, len(chunks))
}
return deregisterTargetsError
}