pkg/deploy/stack_deployer.go (245 lines of code) (raw):

package deploy import ( "context" "fmt" "sync" "sync/atomic" "time" "sigs.k8s.io/controller-runtime/pkg/client" pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws" "github.com/aws/aws-application-networking-k8s/pkg/deploy/externaldns" "github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice" "github.com/aws/aws-application-networking-k8s/pkg/gateway" "github.com/aws/aws-application-networking-k8s/pkg/model/core" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" ) const ( TG_GC_IVL = time.Second * 30 ) type StackDeployer interface { Deploy(ctx context.Context, stack core.Stack) error } type ResourceSynthesizer interface { Synthesize(ctx context.Context) error PostSynthesize(ctx context.Context) error } // Deploy a resource stack func deploy(ctx context.Context, stack core.Stack, synthesizers []ResourceSynthesizer) error { for _, synthesizer := range synthesizers { if err := synthesizer.Synthesize(ctx); err != nil { return err } } for i := len(synthesizers) - 1; i >= 0; i-- { if err := synthesizers[i].PostSynthesize(ctx); err != nil { return err } } return nil } type latticeServiceStackDeployer struct { log gwlog.Logger cloud pkg_aws.Cloud k8sClient client.Client latticeServiceManager lattice.ServiceManager targetGroupManager lattice.TargetGroupManager targetsManager lattice.TargetsManager listenerManager lattice.ListenerManager ruleManager lattice.RuleManager dnsEndpointManager externaldns.DnsEndpointManager svcExportTgBuilder gateway.SvcExportTargetGroupModelBuilder svcBuilder gateway.LatticeServiceBuilder } var tgGcOnce sync.Once var tgGc *TgGc func NewLatticeServiceStackDeploy( log gwlog.Logger, cloud pkg_aws.Cloud, k8sClient client.Client, ) *latticeServiceStackDeployer { brTgBuilder := gateway.NewBackendRefTargetGroupBuilder(log, k8sClient) tgMgr := lattice.NewTargetGroupManager(log, cloud) tgSvcExpBuilder := gateway.NewSvcExportTargetGroupBuilder(log, k8sClient) svcBuilder := gateway.NewLatticeServiceBuilder(log, k8sClient, brTgBuilder) tgGcOnce.Do(func() { // TODO: need to refactor TG synthesizer. Remove stack from constructor // arguments and use it as Synth argument. That will help with Synth // reuse for GC purposes tgGcSynth := lattice.NewTargetGroupSynthesizer(log, cloud, k8sClient, tgMgr, tgSvcExpBuilder, svcBuilder, nil) tgGcFn := NewTgGcFn(tgGcSynth) tgGc = &TgGc{ lock: sync.RWMutex{}, log: log.Named("tg-gc"), ctx: context.TODO(), isDone: atomic.Bool{}, ivl: TG_GC_IVL, cycleFn: tgGcFn, } tgGc.start() }) return &latticeServiceStackDeployer{ log: log, cloud: cloud, k8sClient: k8sClient, latticeServiceManager: lattice.NewServiceManager(log, cloud), targetGroupManager: tgMgr, targetsManager: lattice.NewTargetsManager(log, cloud), listenerManager: lattice.NewListenerManager(log, cloud), ruleManager: lattice.NewRuleManager(log, cloud), dnsEndpointManager: externaldns.NewDnsEndpointManager(log, k8sClient), svcExportTgBuilder: tgSvcExpBuilder, svcBuilder: svcBuilder, } } type TgGcCycleFn = func(context.Context) (TgGcResult, error) func NewTgGcFn(tgSynth *lattice.TargetGroupSynthesizer) TgGcCycleFn { return func(ctx context.Context) (TgGcResult, error) { t0 := time.Now() results, err := tgSynth.SynthesizeUnusedDelete(ctx) if err != nil { return TgGcResult{}, err } succ := 0 for _, res := range results { if res.Err == nil { succ += 1 } } return TgGcResult{ att: len(results), succ: succ, duration: time.Since(t0), }, nil } } type TgGc struct { lock sync.RWMutex log gwlog.Logger ctx context.Context isDone atomic.Bool ivl time.Duration cycleFn TgGcCycleFn } type TgGcResult struct { // number deletion attempts att int // number of successful deletions succ int // cycle duration duration time.Duration } func (gc *TgGc) start() { ticker := time.NewTicker(gc.ivl) go func() { for { select { case <-gc.ctx.Done(): gc.log.Info(context.TODO(), "stop GC, ctx is done") gc.isDone.Store(true) return case <-ticker.C: gc.cycle() } } }() } func (gc *TgGc) cycle() { defer func() { if r := recover(); r != nil { gc.log.Errorf(context.TODO(), "gc cycle panic: %s", r) } gc.lock.Unlock() }() gc.lock.Lock() res, err := gc.cycleFn(gc.ctx) if err != nil { gc.log.Debugf(context.TODO(), "gc cycle error: %s", err) } gc.log.Debugw(context.TODO(), "gc stats", "delete_attempts", res.att, "delete_success", res.succ, "duration", res.duration, ) } func (d *latticeServiceStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { targetGroupSynthesizer := lattice.NewTargetGroupSynthesizer(d.log, d.cloud, d.k8sClient, d.targetGroupManager, d.svcExportTgBuilder, d.svcBuilder, stack) targetsSynthesizer := lattice.NewTargetsSynthesizer(d.log, d.k8sClient, d.targetsManager, stack) serviceSynthesizer := lattice.NewServiceSynthesizer(d.log, d.latticeServiceManager, d.dnsEndpointManager, stack) listenerSynthesizer := lattice.NewListenerSynthesizer(d.log, d.listenerManager, d.targetGroupManager, stack) ruleSynthesizer := lattice.NewRuleSynthesizer(d.log, d.ruleManager, d.targetGroupManager, stack) defer func() { tgGc.lock.RUnlock() }() tgGc.lock.RLock() //Handle targetGroups creation request if err := targetGroupSynthesizer.SynthesizeCreate(ctx); err != nil { return fmt.Errorf("error during tg synthesis %w", err) } //Handle targets "reconciliation" request (register intend-to-be-registered targets and deregister intend-to-be-registered targets) if err := targetsSynthesizer.Synthesize(ctx); err != nil { return fmt.Errorf("error during target synthesis %w", err) } // Handle latticeService "reconciliation" request if err := serviceSynthesizer.Synthesize(ctx); err != nil { return fmt.Errorf("error during service synthesis %w", err) } //Handle latticeService listeners "reconciliation" request if err := listenerSynthesizer.Synthesize(ctx); err != nil { return fmt.Errorf("error during listener synthesis %w", err) } //Handle latticeService listener's rules "reconciliation" request if err := ruleSynthesizer.Synthesize(ctx); err != nil { return fmt.Errorf("error during rule synthesis %w", err) } // Handle pod status update for targets. if err := targetsSynthesizer.PostSynthesize(ctx); err != nil { return fmt.Errorf("error during target post synthesis %w", err) } //Handle targetGroup deletion request if err := targetGroupSynthesizer.SynthesizeDelete(ctx); err != nil { return fmt.Errorf("error during tg delete synthesis %w", err) } return nil } type latticeTargetGroupStackDeployer struct { log gwlog.Logger cloud pkg_aws.Cloud k8sclient client.Client targetGroupManager lattice.TargetGroupManager svcExportTgBuilder gateway.SvcExportTargetGroupModelBuilder svcBuilder gateway.LatticeServiceBuilder } // triggered by service export func NewTargetGroupStackDeploy( log gwlog.Logger, cloud pkg_aws.Cloud, k8sClient client.Client, ) *latticeTargetGroupStackDeployer { brTgBuilder := gateway.NewBackendRefTargetGroupBuilder(log, k8sClient) return &latticeTargetGroupStackDeployer{ log: log, cloud: cloud, k8sclient: k8sClient, targetGroupManager: lattice.NewTargetGroupManager(log, cloud), svcExportTgBuilder: gateway.NewSvcExportTargetGroupBuilder(log, k8sClient), svcBuilder: gateway.NewLatticeServiceBuilder(log, k8sClient, brTgBuilder), } } func (d *latticeTargetGroupStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { defer func() { tgGc.lock.RUnlock() }() tgGc.lock.RLock() synthesizers := []ResourceSynthesizer{ lattice.NewTargetGroupSynthesizer(d.log, d.cloud, d.k8sclient, d.targetGroupManager, d.svcExportTgBuilder, d.svcBuilder, stack), lattice.NewTargetsSynthesizer(d.log, d.k8sclient, lattice.NewTargetsManager(d.log, d.cloud), stack), } return deploy(ctx, stack, synthesizers) } type accessLogSubscriptionStackDeployer struct { log gwlog.Logger k8sClient client.Client manager lattice.AccessLogSubscriptionManager } func NewAccessLogSubscriptionStackDeployer( log gwlog.Logger, cloud pkg_aws.Cloud, k8sClient client.Client, ) *accessLogSubscriptionStackDeployer { return &accessLogSubscriptionStackDeployer{ log: log, k8sClient: k8sClient, manager: lattice.NewAccessLogSubscriptionManager(log, cloud), } } func (d *accessLogSubscriptionStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { synthesizers := []ResourceSynthesizer{ lattice.NewAccessLogSubscriptionSynthesizer(d.log, d.k8sClient, d.manager, stack), } return deploy(ctx, stack, synthesizers) }