pkg/clusters/clusters.go (181 lines of code) (raw):

/* Copyright © 2021 Google Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package clusters import ( "context" "fmt" "legacymigration/pkg" "legacymigration/pkg/migrate" "legacymigration/pkg/operations" log "github.com/sirupsen/logrus" "google.golang.org/api/container/v1" ) // Cluster and NodePoolOptions type Options struct { ConcurrentNodePools uint16 DesiredControlPlaneVersion string DesiredNodeVersion string InPlaceControlPlaneUpgrade bool } type clusterMigrator struct { projectID string cluster *container.Cluster handler operations.Handler clients *pkg.Clients opts *Options factory func(n *container.NodePool) migrate.Migrator // Field(s) populated during Complete. resolvedDesiredControlPlaneVersion string serverConfig *container.ServerConfig releaseChannel string children []migrate.Migrator } func New( projectID string, cluster *container.Cluster, handler operations.Handler, clients *pkg.Clients, opts *Options) *clusterMigrator { m := &clusterMigrator{ projectID: projectID, cluster: cluster, handler: handler, clients: clients, opts: opts, } m.factory = func(n *container.NodePool) migrate.Migrator { return NewNodePool(m, n) } return m } // Complete initializes this migrator instance. func (m *clusterMigrator) Complete(ctx context.Context) error { resp, err := m.clients.Container.ListNodePools(ctx, m.ResourcePath()) if err != nil { return fmt.Errorf("error retrieving NodePools for Cluster %s: %w", m.ResourcePath(), err) } path := pkg.LocationPath(m.projectID, m.cluster.Location) m.serverConfig, err = m.clients.Container.GetServerConfig(ctx, path) if err != nil { return fmt.Errorf("error retrieving ServerConfig for Cluster %s: %w", m.ResourcePath(), err) } m.releaseChannel = getReleaseChannel(m.cluster.ReleaseChannel) def, valid := getVersions(m.serverConfig, m.releaseChannel, ControlPlane) if m.opts.InPlaceControlPlaneUpgrade { m.resolvedDesiredControlPlaneVersion = m.cluster.CurrentMasterVersion } else { m.resolvedDesiredControlPlaneVersion, err = resolveVersion(m.opts.DesiredControlPlaneVersion, def, valid) if err != nil { return err } } m.children = make([]migrate.Migrator, len(resp.NodePools)) for i, np := range resp.NodePools { m.children[i] = m.factory(np) } log.Infof("Initialize NodePool objects for Cluster %s", m.ResourcePath()) sem := make(chan struct{}, m.opts.ConcurrentNodePools) return migrate.Complete(ctx, sem, m.children...) } // Validate confirms that this an any child migrators are valid. func (m *clusterMigrator) Validate(ctx context.Context) error { _, valid := getVersions(m.serverConfig, m.releaseChannel, ControlPlane) if err := isUpgrade(m.resolvedDesiredControlPlaneVersion, m.cluster.CurrentMasterVersion, valid, true); err != nil { return fmt.Errorf("validation error for Cluster %s: %w", m.ResourcePath(), err) } log.Infof("Upgrade for Cluster %s is valid; desired: %q (%s), current: %s", m.ResourcePath(), m.opts.DesiredControlPlaneVersion, m.resolvedDesiredControlPlaneVersion, m.cluster.CurrentMasterVersion) log.Infof("Validate NodePool upgrade(s) for Cluster %s", m.ResourcePath()) sem := make(chan struct{}, m.opts.ConcurrentNodePools) return migrate.Validate(ctx, sem, m.children...) } // Migrate performs upgrade on the Cluster func (m *clusterMigrator) Migrate(ctx context.Context) error { if err := operations.WaitForOperationInProgress(ctx, m.upgradeControlPlane, m.wait); err != nil { return err } return m.upgradeNodePools(ctx) } func (m *clusterMigrator) upgradeControlPlane(ctx context.Context) error { if m.cluster.Subnetwork != "" { log.Infof("Cluster %s does not require control plane upgrade.", m.ResourcePath()) return nil } req := &container.UpdateMasterRequest{ Name: m.ResourcePath(), MasterVersion: m.resolvedDesiredControlPlaneVersion, } log.Infof("Upgrading control plane for Cluster %q to version %q", req.Name, req.MasterVersion) op, err := m.clients.Container.UpdateMaster(ctx, req) if err != nil { return fmt.Errorf("error upgrading control plane for Cluster %s: %w", m.ResourcePath(), err) } path := pkg.PathRegex.FindString(op.SelfLink) w := &ContainerOperation{ ProjectID: m.projectID, Path: path, Client: m.clients.Container, } if err := m.handler.Wait(ctx, w); err != nil { return fmt.Errorf("error waiting on Operation %s: %w", path, err) } log.Infof("Upgraded control plane for Cluster %q to version %q", req.Name, req.MasterVersion) resp, err := m.clients.Container.GetCluster(ctx, m.ResourcePath()) if err != nil { return fmt.Errorf("unable to confirm subnetwork value for cluster %s: %w", m.ResourcePath(), err) } if resp.Subnetwork == "" { return fmt.Errorf("subnetwork field is empty for cluster %s", m.ResourcePath()) } return nil } // upgradeNodePools upgrades all Nodes for a clusters. // This is to ensure that the instance templates for the nodes func (m *clusterMigrator) upgradeNodePools(ctx context.Context) error { log.Infof("Initiate NodePool upgrades for Cluster %s", m.ResourcePath()) sem := make(chan struct{}, m.opts.ConcurrentNodePools) return migrate.Migrate(ctx, sem, m.children...) } // ResourcePath formats identifying information about the cluster. func (m *clusterMigrator) ResourcePath() string { return pkg.ClusterPath(m.projectID, m.cluster.Location, m.cluster.Name) } func (m *clusterMigrator) wait(ctx context.Context, opID string) error { name := pkg.OperationsPath(m.projectID, m.cluster.Location, opID) op, err := m.clients.Container.GetOperation(ctx, name) if err != nil { return err } opPath := pkg.PathRegex.FindString(op.SelfLink) w := &ContainerOperation{ ProjectID: m.projectID, Path: opPath, Client: m.clients.Container, } if err := m.handler.Wait(ctx, w); err != nil { return fmt.Errorf("error waiting on ongoing operation %s: %w", name, err) } return nil } type ContainerOperation struct { ProjectID string Path string Client pkg.ContainerService } func (o *ContainerOperation) String() string { return o.Path } func (o *ContainerOperation) poll(ctx context.Context) (operations.OperationStatus, error) { log.Debugf("Polling for %s", o.String()) var status operations.OperationStatus resp, err := o.Client.GetOperation(ctx, o.Path) if err != nil { return status, fmt.Errorf("error retrieving Operation %s: %w", o.Path, err) } status = operationStatus(resp) log.Debugf("Operation %s status: %#v", o.Path, status) return status, nil } func (o *ContainerOperation) IsFinished(ctx context.Context) (bool, error) { return operations.IsFinished(ctx, o.poll) } func operationStatus(op *container.Operation) operations.OperationStatus { var msg string if op.Error != nil { msg = op.Error.Message } return operations.OperationStatus{ Status: op.Status, Error: msg, } }