pkg/deploy/lattice/rule_manager.go (304 lines of code) (raw):
package lattice
import (
"context"
"errors"
"fmt"
"reflect"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/vpclattice"
pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
)
//go:generate mockgen -destination rule_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice RuleManager
type RuleManager interface {
Upsert(ctx context.Context, modelRule *model.Rule, modelListener *model.Listener, modelSvc *model.Service) (model.RuleStatus, error)
Delete(ctx context.Context, ruleId string, serviceId string, listenerId string) error
UpdatePriorities(ctx context.Context, svcId string, listenerId string, rules []*model.Rule) error
List(ctx context.Context, serviceId string, listenerId string) ([]*vpclattice.RuleSummary, error)
Get(ctx context.Context, serviceId string, listenerId string, ruleId string) (*vpclattice.GetRuleOutput, error)
}
type defaultRuleManager struct {
log gwlog.Logger
cloud pkg_aws.Cloud
}
func NewRuleManager(
log gwlog.Logger,
cloud pkg_aws.Cloud,
) *defaultRuleManager {
return &defaultRuleManager{
log: log,
cloud: cloud,
}
}
func (r *defaultRuleManager) Get(ctx context.Context, serviceId string, listenerId string, ruleId string) (*vpclattice.GetRuleOutput, error) {
getRuleInput := vpclattice.GetRuleInput{
ListenerIdentifier: aws.String(listenerId),
ServiceIdentifier: aws.String(serviceId),
RuleIdentifier: aws.String(ruleId),
}
resp, err := r.cloud.Lattice().GetRuleWithContext(ctx, &getRuleInput)
return resp, err
}
func (r *defaultRuleManager) List(ctx context.Context, svcId string, listenerId string) ([]*vpclattice.RuleSummary, error) {
ruleListInput := vpclattice.ListRulesInput{
ServiceIdentifier: aws.String(svcId),
ListenerIdentifier: aws.String(listenerId),
}
return r.cloud.Lattice().ListRulesAsList(ctx, &ruleListInput)
}
func (r *defaultRuleManager) UpdatePriorities(ctx context.Context, svcId string, listenerId string, rules []*model.Rule) error {
var ruleUpdateList []*vpclattice.RuleUpdate
for _, rule := range rules {
ruleUpdate := vpclattice.RuleUpdate{
RuleIdentifier: aws.String(rule.Status.Id),
Priority: aws.Int64(rule.Spec.Priority),
}
ruleUpdateList = append(ruleUpdateList, &ruleUpdate)
}
// BatchUpdate rules using right priority
batchRuleInput := vpclattice.BatchUpdateRuleInput{
ServiceIdentifier: aws.String(svcId),
ListenerIdentifier: aws.String(listenerId),
Rules: ruleUpdateList,
}
_, err := r.cloud.Lattice().BatchUpdateRuleWithContext(ctx, &batchRuleInput)
if err != nil {
return fmt.Errorf("failed BatchUpdateRule %s, %s, due to %s", svcId, listenerId, err)
}
r.log.Infof(ctx, "Success BatchUpdateRule %s, %s", svcId, listenerId)
return nil
}
func (r *defaultRuleManager) buildLatticeRule(modelRule *model.Rule) (*vpclattice.GetRuleOutput, error) {
gro := vpclattice.GetRuleOutput{
IsDefault: aws.Bool(false),
Priority: aws.Int64(modelRule.Spec.Priority),
}
httpMatch := vpclattice.HttpMatch{}
updateMatchFromRule(&httpMatch, modelRule)
gro.Match = &vpclattice.RuleMatch{HttpMatch: &httpMatch}
// check if we have at least one valid target group
var hasValidTargetGroup bool
for _, tg := range modelRule.Spec.Action.TargetGroups {
if tg.LatticeTgId != model.InvalidBackendRefTgId {
hasValidTargetGroup = true
break
}
}
if hasValidTargetGroup {
var latticeTGs []*vpclattice.WeightedTargetGroup
for _, ruleTg := range modelRule.Spec.Action.TargetGroups {
// skip any invalid TGs - eventually VPC Lattice may support weighted fixed response
// and this logic can be more in line with the spec
if ruleTg.LatticeTgId == model.InvalidBackendRefTgId {
continue
}
latticeTG := vpclattice.WeightedTargetGroup{
TargetGroupIdentifier: aws.String(ruleTg.LatticeTgId),
Weight: aws.Int64(ruleTg.Weight),
}
latticeTGs = append(latticeTGs, &latticeTG)
}
gro.Action = &vpclattice.RuleAction{
Forward: &vpclattice.ForwardAction{
TargetGroups: latticeTGs,
},
}
} else {
r.log.Debugf(context.TODO(), "There are no valid target groups, defaulting to 404 Fixed response")
gro.Action = &vpclattice.RuleAction{
FixedResponse: &vpclattice.FixedResponseAction{
StatusCode: aws.Int64(model.DefaultActionFixedResponseStatusCode),
},
}
}
gro.Name = aws.String(fmt.Sprintf("k8s-%d-rule-%d", modelRule.Spec.CreateTime.Unix(), modelRule.Spec.Priority))
return &gro, nil
}
func (r *defaultRuleManager) Upsert(
ctx context.Context,
modelRule *model.Rule,
modelListener *model.Listener,
modelSvc *model.Service,
) (model.RuleStatus, error) {
if modelListener.Status == nil || modelListener.Status.Id == "" {
return model.RuleStatus{}, errors.New("listener is missing id")
}
if modelSvc.Status == nil || modelSvc.Status.Id == "" {
return model.RuleStatus{}, errors.New("model service is missing id")
}
for i, mtg := range modelRule.Spec.Action.TargetGroups {
if mtg.LatticeTgId == "" {
return model.RuleStatus{}, fmt.Errorf("rule %d action %d is missing lattice target group id", modelRule.Spec.Priority, i)
}
}
latticeServiceId := modelSvc.Status.Id
latticeListenerId := modelListener.Status.Id
// this allows us to make apples to apples comparisons with what's in Lattice already
latticeRuleFromModel, err := r.buildLatticeRule(modelRule)
if err != nil {
return model.RuleStatus{}, err
}
r.log.Debugf(ctx, "Upsert rule %s for service %s-%s and listener port %d and protocol %s",
aws.StringValue(latticeRuleFromModel.Name), latticeServiceId, latticeListenerId,
modelListener.Spec.Port, modelListener.Spec.Protocol)
lri := vpclattice.ListRulesInput{
ServiceIdentifier: aws.String(modelSvc.Status.Id),
ListenerIdentifier: aws.String(modelListener.Status.Id),
}
// TODO: fetching all rules every time is not efficient - maybe have a separate public method to prepopulate?
currentLatticeRules, err := r.cloud.Lattice().GetRulesAsList(ctx, &lri)
if err != nil {
return model.RuleStatus{}, err
}
var matchingRule *vpclattice.GetRuleOutput
for _, clr := range currentLatticeRules {
if isMatchEqual(latticeRuleFromModel, clr) {
matchingRule = clr
break
}
}
if matchingRule == nil {
return r.create(ctx, currentLatticeRules, latticeRuleFromModel, latticeServiceId, latticeListenerId)
} else {
return r.updateIfNeeded(ctx, latticeRuleFromModel, matchingRule, latticeServiceId, latticeListenerId)
}
}
func (r *defaultRuleManager) updateIfNeeded(
ctx context.Context,
ruleToUpdate *vpclattice.GetRuleOutput,
matchingRule *vpclattice.GetRuleOutput,
latticeSvcId string,
latticeListenerId string,
) (model.RuleStatus, error) {
updatedRuleStatus := model.RuleStatus{
Name: aws.StringValue(matchingRule.Name),
Arn: aws.StringValue(matchingRule.Arn),
Id: aws.StringValue(matchingRule.Id),
ListenerId: latticeListenerId,
ServiceId: latticeSvcId,
Priority: aws.Int64Value(matchingRule.Priority),
}
// we already validated Match, if Action is also the same then no updates required
updateNeeded := !reflect.DeepEqual(ruleToUpdate.Action, matchingRule.Action)
if !updateNeeded {
r.log.Debugf(ctx, "rule unchanged, no updates required")
return updatedRuleStatus, nil
}
// when we update a rule, we use the priority of the existing rule to avoid conflicts
ruleToUpdate.Priority = matchingRule.Priority
ruleToUpdate.Id = matchingRule.Id
uri := vpclattice.UpdateRuleInput{
Action: ruleToUpdate.Action,
ServiceIdentifier: aws.String(latticeSvcId),
ListenerIdentifier: aws.String(latticeListenerId),
RuleIdentifier: ruleToUpdate.Id,
Match: ruleToUpdate.Match,
Priority: ruleToUpdate.Priority,
}
_, err := r.cloud.Lattice().UpdateRuleWithContext(ctx, &uri)
if err != nil {
return model.RuleStatus{}, fmt.Errorf("failed UpdateRule %d for %s, %s due to %s",
ruleToUpdate.Priority, latticeListenerId, latticeSvcId, err)
}
r.log.Infof(ctx, "Success UpdateRule %d for %s, %s", ruleToUpdate.Priority, latticeListenerId, latticeSvcId)
return updatedRuleStatus, nil
}
func (r *defaultRuleManager) create(
ctx context.Context,
currentLatticeRules []*vpclattice.GetRuleOutput,
ruleToCreate *vpclattice.GetRuleOutput,
latticeSvcId string,
latticeListenerId string,
) (model.RuleStatus, error) {
// when we create a rule, we just pick an available priority so we can
// successfully create the rule. After all rules are created, we update
// priorities based on the order they appear in the Route. Note, this
// approach is not fully compliant with the gw spec
priority, err := r.nextAvailablePriority(currentLatticeRules)
if err != nil {
return model.RuleStatus{}, err
}
ruleToCreate.Priority = aws.Int64(priority)
cri := vpclattice.CreateRuleInput{
Action: ruleToCreate.Action,
ServiceIdentifier: aws.String(latticeSvcId),
ListenerIdentifier: aws.String(latticeListenerId),
Match: ruleToCreate.Match,
Name: ruleToCreate.Name,
Priority: ruleToCreate.Priority,
Tags: r.cloud.DefaultTags(),
}
res, err := r.cloud.Lattice().CreateRuleWithContext(ctx, &cri)
if err != nil {
return model.RuleStatus{}, fmt.Errorf("failed CreateRule %s, %s due to %s", latticeListenerId, latticeSvcId, err)
}
r.log.Infof(ctx, "Success CreateRule %s, %s", aws.StringValue(res.Name), aws.StringValue(res.Id))
return model.RuleStatus{
Name: aws.StringValue(res.Name),
Arn: aws.StringValue(res.Arn),
Id: aws.StringValue(res.Id),
ServiceId: latticeSvcId,
ListenerId: latticeListenerId,
Priority: aws.Int64Value(res.Priority),
}, nil
}
func updateMatchFromRule(httpMatch *vpclattice.HttpMatch, modelRule *model.Rule) {
// setup path based
if modelRule.Spec.PathMatchExact || modelRule.Spec.PathMatchPrefix {
matchType := vpclattice.PathMatchType{}
if modelRule.Spec.PathMatchExact {
matchType.Exact = aws.String(modelRule.Spec.PathMatchValue)
}
if modelRule.Spec.PathMatchPrefix {
matchType.Prefix = aws.String(modelRule.Spec.PathMatchValue)
}
httpMatch.PathMatch = &vpclattice.PathMatch{
Match: &matchType,
CaseSensitive: aws.Bool(true), // see PathMatchType.PathPrefix in gw spec
}
}
if modelRule.Spec.Method != "" {
httpMatch.Method = &modelRule.Spec.Method
}
for i := 0; i < len(modelRule.Spec.MatchedHeaders); i++ {
headerMatch := vpclattice.HeaderMatch{
Match: modelRule.Spec.MatchedHeaders[i].Match,
Name: modelRule.Spec.MatchedHeaders[i].Name,
CaseSensitive: aws.Bool(false), // see HTTPHeaderMatch.HTTPHeaderName in gw spec
}
httpMatch.HeaderMatches = append(httpMatch.HeaderMatches, &headerMatch)
}
}
func isMatchEqual(localRule, latticeRule *vpclattice.GetRuleOutput) bool {
// currently lattice API converts nil HeaderMatches to empty list on create
// if we're currently nil, test both just in case it gets fixed later
if localRule.Match != nil && localRule.Match.HttpMatch != nil &&
localRule.Match.HttpMatch.HeaderMatches == nil {
firstTry := reflect.DeepEqual(localRule.Match, latticeRule.Match)
if firstTry {
return true
}
// test with empty, then reset to original value
localRule.Match.HttpMatch.HeaderMatches = make([]*vpclattice.HeaderMatch, 0)
secondTry := reflect.DeepEqual(localRule.Match, latticeRule.Match)
localRule.Match.HttpMatch.HeaderMatches = nil
return secondTry
}
// otherwise we can rely on normal equality
return reflect.DeepEqual(localRule.Match, latticeRule.Match)
}
func (r *defaultRuleManager) nextAvailablePriority(latticeRules []*vpclattice.GetRuleOutput) (int64, error) {
var priorities [model.MaxRulePriority]bool
for i := 0; i < model.MaxRulePriority; i++ {
priorities[i] = false
}
for _, lr := range latticeRules {
if lr.IsDefault != nil && aws.BoolValue(lr.IsDefault) {
continue
}
// priority range is 1 -> 100
priorities[aws.Int64Value(lr.Priority)-1] = true
}
for i := 0; i < model.MaxRulePriority; i++ {
if !priorities[i] {
return int64(i + 1), nil
}
}
return 0, errors.New("no available priorities")
}
func (r *defaultRuleManager) Delete(ctx context.Context, ruleId string, serviceId string, listenerId string) error {
r.log.Debugf(ctx, "Deleting rule %s for listener %s and service %s", ruleId, listenerId, serviceId)
deleteInput := vpclattice.DeleteRuleInput{
ServiceIdentifier: aws.String(serviceId),
ListenerIdentifier: aws.String(listenerId),
RuleIdentifier: aws.String(ruleId),
}
_, err := r.cloud.Lattice().DeleteRuleWithContext(ctx, &deleteInput)
if err != nil {
return fmt.Errorf("failed DeleteRule %s/%s/%s due to %s", serviceId, listenerId, ruleId, err)
}
r.log.Infof(ctx, "Success DeleteRule %s/%s/%s", serviceId, listenerId, ruleId)
return nil
}