pkg/bundle/manager.go (231 lines of code) (raw):
package bundle
import (
"context"
"fmt"
"os"
"time"
"github.com/go-logr/logr"
"golang.org/x/mod/semver"
api "github.com/aws/eks-anywhere-packages/api/v1alpha1"
"github.com/aws/eks-anywhere-packages/pkg/authenticator"
"github.com/aws/eks-anywhere-packages/pkg/config"
)
//go:generate mockgen -source manager.go -destination=mocks/manager.go -package=mocks Manager
type Manager interface {
// ProcessBundle returns true if there are changes
ProcessBundle(ctx context.Context, newBundle *api.PackageBundle) (bool, error)
// ProcessBundleController process the bundle controller
ProcessBundleController(ctx context.Context, pbc *api.PackageBundleController) error
}
type bundleManager struct {
log logr.Logger
bundleClient Client
registryClient RegistryClient
targetClient authenticator.TargetClusterClient
config config.Config
}
func NewBundleManager(log logr.Logger, registryClient RegistryClient, bundleClient Client, targetClient authenticator.TargetClusterClient, config config.Config) *bundleManager {
return &bundleManager{
log: log,
bundleClient: bundleClient,
registryClient: registryClient,
targetClient: targetClient,
config: config,
}
}
var _ Manager = (*bundleManager)(nil)
func (m bundleManager) ProcessBundle(_ context.Context, newBundle *api.PackageBundle) (bool, error) {
if newBundle.Namespace != api.PackageNamespace {
if newBundle.Status.State != api.PackageBundleStateIgnored {
newBundle.Spec.DeepCopyInto(&newBundle.Status.Spec)
newBundle.Status.State = api.PackageBundleStateIgnored
m.log.V(6).Info("update", "bundle", newBundle.Name, "state", newBundle.Status.State)
return true, nil
}
return false, nil
}
if !m.isCompatibleWith(newBundle) {
if newBundle.Status.State != api.PackageBundleStateUpgradeRequired {
newBundle.Spec.DeepCopyInto(&newBundle.Status.Spec)
newBundle.Status.State = api.PackageBundleStateUpgradeRequired
m.log.V(6).Info("update", "bundle", newBundle.Name, "state", newBundle.Status.State)
return true, nil
}
return false, nil
}
if !newBundle.IsValidVersion() {
if newBundle.Status.State != api.PackageBundleStateInvalid {
newBundle.Spec.DeepCopyInto(&newBundle.Status.Spec)
newBundle.Status.State = api.PackageBundleStateInvalid
m.log.V(6).Info("update", "bundle", newBundle.Name, "state", newBundle.Status.State)
return true, nil
}
return false, nil
}
if newBundle.Status.State != api.PackageBundleStateAvailable {
newBundle.Spec.DeepCopyInto(&newBundle.Status.Spec)
newBundle.Status.State = api.PackageBundleStateAvailable
m.log.V(6).Info("update", "bundle", newBundle.Name, "state", newBundle.Status.State)
return true, nil
}
return false, nil
}
func (m *bundleManager) isCompatibleWith(bundle *api.PackageBundle) bool {
currentVersion := m.config.BuildInfo.Version
return currentVersion == config.DEVELOPMENT || semver.Compare(currentVersion, bundle.Spec.MinVersion) >= 0
}
func (m *bundleManager) hasBundleNamed(bundles []api.PackageBundle, bundleName string) bool {
for _, b := range bundles {
if b.Name == bundleName {
return true
}
}
return false
}
func (m *bundleManager) ProcessBundleController(ctx context.Context, pbc *api.PackageBundleController) error {
info, err := m.targetClient.GetServerVersion(ctx, pbc.Name)
if err != nil {
m.log.Error(err, "Unable to get server version")
if pbc.Status.State == api.BundleControllerStateActive || pbc.Status.State == "" {
pbc.Status.Detail = err.Error()
pbc.Status.State = api.BundleControllerStateDisconnected
err = m.bundleClient.SaveStatus(ctx, pbc)
if err != nil {
return fmt.Errorf("updating %s status to %s: %s", pbc.Name, pbc.Status.State, err)
}
}
return nil
}
if err := m.targetClient.Initialize(ctx, os.Getenv("CLUSTER_NAME")); err != nil {
m.log.Error(err, "failed to intialize cluster client of management cluster")
}
config, _ := m.targetClient.ToRESTConfig()
auth, _ := authenticator.NewECRSecret(config)
// Once we've fully removed ecr-token-refresher usage from all OS we can remove the below check.
if err := auth.AddSecretToAllNamespace(ctx); err != nil {
} else {
time.Sleep(3 * time.Second)
}
latestBundle, err := m.registryClient.LatestBundle(ctx, pbc.GetBundleURI(), info.Major, info.Minor, pbc.Name)
if err != nil {
m.log.Error(err, "Unable to get latest bundle")
if pbc.Status.State == api.BundleControllerStateActive || pbc.Status.State == "" {
pbc.Status.State = api.BundleControllerStateDisconnected
pbc.Status.Detail = err.Error()
err = m.bundleClient.SaveStatus(ctx, pbc)
if err != nil {
return fmt.Errorf("updating %s status to %s: %s", pbc.Name, pbc.Status.State, err)
}
}
return err
}
allBundles, err := m.bundleClient.GetBundleList(ctx)
if err != nil {
return fmt.Errorf("getting bundle list: %s", err)
}
if !m.hasBundleNamed(allBundles, latestBundle.Name) {
err = m.bundleClient.CreateBundle(ctx, latestBundle)
if err != nil {
return err
}
}
latestBundleIsCurrentBundle := latestBundle.Name == pbc.Spec.ActiveBundle
if pbc.Spec.ActiveBundle == "" {
pbc.Status.State = ""
}
switch pbc.Status.State {
case api.BundleControllerStateActive:
err = m.bundleClient.CreateClusterConfigMap(ctx, pbc.Name)
if err != nil {
return fmt.Errorf("creating configmap for %s: %s", pbc.Name, err)
}
err = m.targetClient.CreateClusterNamespace(ctx, pbc.GetName())
if err != nil {
return fmt.Errorf("creating workload cluster namespace eksa-packages for %s: %s", pbc.Name, err)
}
if pbc.GetName() != os.Getenv("CLUSTER_NAME") {
secret, err := m.bundleClient.GetSecret(ctx, "aws-secret")
if err != nil {
return fmt.Errorf("getting aws secret eksa-packages:%s", err)
}
if secret != nil {
err = m.targetClient.ApplySecret(ctx, secret)
if err != nil {
return fmt.Errorf("creating workload cluster secret aws-secret:%s", err)
}
}
}
if len(pbc.Spec.ActiveBundle) > 0 {
if !m.hasBundleNamed(allBundles, pbc.Spec.ActiveBundle) {
activeBundle, err := m.registryClient.DownloadBundle(ctx, pbc.GetActiveBundleURI(), pbc.Name)
if err != nil {
m.log.Error(err, "Active bundle download failed", "bundle", pbc.Spec.ActiveBundle)
return nil
}
m.log.Info("Bundle downloaded", "bundle", pbc.Spec.ActiveBundle)
err = m.bundleClient.CreateBundle(ctx, activeBundle)
if err != nil {
m.log.Error(err, "Recreate active bundle failed", "bundle", pbc.Spec.ActiveBundle)
return nil
}
m.log.Info("Bundle created", "bundle", pbc.Spec.ActiveBundle)
}
}
if latestBundleIsCurrentBundle {
break
}
pbc.Status.State = api.BundleControllerStateUpgradeAvailable
m.log.V(6).Info("update", "PackageBundleController", pbc.Name, "state", pbc.Status.State)
pbc.Status.Detail = latestBundle.Name + " available"
pbc.Spec.DeepCopyInto(&pbc.Status.Spec)
err = m.bundleClient.SaveStatus(ctx, pbc)
if err != nil {
return fmt.Errorf("updating %s status to %s: %s", pbc.Name, pbc.Status.State, err)
}
case api.BundleControllerStateUpgradeAvailable:
if !latestBundleIsCurrentBundle {
if pbc.Status.Detail != latestBundle.Name+" available" {
pbc.Status.Detail = latestBundle.Name + " available"
pbc.Spec.DeepCopyInto(&pbc.Status.Spec)
err = m.bundleClient.SaveStatus(ctx, pbc)
if err != nil {
return fmt.Errorf("updating %s detail to %s: %s", pbc.Name, pbc.Status.Detail, err)
}
}
break
}
pbc.Status.State = api.BundleControllerStateActive
m.log.V(6).Info("update", "PackageBundleController", pbc.Name, "state", pbc.Status.State)
pbc.Status.Detail = ""
pbc.Spec.DeepCopyInto(&pbc.Status.Spec)
err = m.bundleClient.SaveStatus(ctx, pbc)
if err != nil {
return fmt.Errorf("updating %s status to %s: %s", pbc.Name, pbc.Status.State, err)
}
case api.BundleControllerStateDisconnected:
pbc.Status.State = api.BundleControllerStateActive
m.log.V(6).Info("update", "PackageBundleController", pbc.Name, "state", pbc.Status.State)
pbc.Status.Detail = ""
pbc.Spec.DeepCopyInto(&pbc.Status.Spec)
err = m.bundleClient.SaveStatus(ctx, pbc)
if err != nil {
return fmt.Errorf("updating %s status to %s: %s", pbc.Name, pbc.Status.State, err)
}
default:
if pbc.Spec.ActiveBundle != "" {
pbc.Status.State = api.BundleControllerStateActive
m.log.V(6).Info("update", "PackageBundleController", pbc.Name, "state", pbc.Status.State)
pbc.Status.Detail = ""
pbc.Spec.DeepCopyInto(&pbc.Status.Spec)
err = m.bundleClient.SaveStatus(ctx, pbc)
if err != nil {
return fmt.Errorf("updating %s status to %s: %s", pbc.Name, pbc.Status.State, err)
}
} else {
pbc.Spec.ActiveBundle = latestBundle.Name
m.log.V(6).Info("update", "PackageBundleController", pbc.Name, "activeBundle", pbc.Spec.ActiveBundle)
pbc.Status.Detail = ""
err = m.bundleClient.Save(ctx, pbc)
if err != nil {
return fmt.Errorf("updating %s activeBundle to %s: %s", pbc.Name, pbc.Spec.ActiveBundle, err)
}
}
}
return nil
}