pkg/networks/networks.go (166 lines of code) (raw):

/* Copyright © 2021 Google Licensed under the Apache License, version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package networks import ( "context" "fmt" "strings" "legacymigration/pkg" "legacymigration/pkg/clusters" "legacymigration/pkg/migrate" "legacymigration/pkg/operations" log "github.com/sirupsen/logrus" "google.golang.org/api/compute/v1" "google.golang.org/api/container/v1" ) type networkMigrator struct { projectID string network *compute.Network handler operations.Handler clients *pkg.Clients concurrentClusters uint16 factory func(c *container.Cluster) migrate.Migrator children []migrate.Migrator } func New( projectID string, network *compute.Network, handler operations.Handler, clients *pkg.Clients, concurrentClusters uint16, opts *clusters.Options) *networkMigrator { factory := func(c *container.Cluster) migrate.Migrator { return clusters.New(projectID, c, handler, clients, opts) } return &networkMigrator{ projectID: projectID, handler: handler, network: network, clients: clients, concurrentClusters: concurrentClusters, factory: factory, } } // ResourcePath returns the resource path for this migrator. func (m *networkMigrator) ResourcePath() string { return pkg.NetworkPath(m.projectID, m.network.Name) } // Complete finishes initializing the networkMigrator. func (m *networkMigrator) Complete(ctx context.Context) error { path := pkg.LocationPath(m.projectID, pkg.AnyLocation) resp, err := m.clients.Container.ListClusters(ctx, path) if err != nil { return fmt.Errorf("error listing Clusters for network %s: %w", m.ResourcePath(), err) } if len(resp.MissingZones) > 0 { log.Warnf("Clusters.List response is missing zones: %v", resp.MissingZones) } filteredClusters := make([]*container.Cluster, 0) for _, c := range resp.Clusters { if c.Network == m.network.Name { filteredClusters = append(filteredClusters, c) } } m.children = make([]migrate.Migrator, len(filteredClusters)) for i, c := range filteredClusters { m.children[i] = m.factory(c) } // API returns error if no GCE resource with an internal IP (e.g. Cluster) is present on the Network: // "No internal IP resources on the Network. This network does not need to be migrated." if len(m.children) == 0 { log.Warnf("Network %s contains no clusters.", m.ResourcePath()) } sem := make(chan struct{}, m.concurrentClusters) return migrate.Complete(ctx, sem, m.children...) } // Validate ensures child migrators can be run without error. func (m *networkMigrator) Validate(ctx context.Context) error { sem := make(chan struct{}, m.concurrentClusters) return migrate.Validate(ctx, sem, m.children...) } // Migrate performs the network migration and then the cluster upgrades. func (m *networkMigrator) Migrate(ctx context.Context) error { if err := operations.WaitForOperationInProgress(ctx, m.migrateNetwork, m.wait); err != nil { return err } return m.migrateClusters(ctx) } func (m *networkMigrator) migrateNetwork(ctx context.Context) error { path := m.ResourcePath() if m.network.IPv4Range == "" { log.Infof("Network %s is already a VPC network.", path) return nil } log.Infof("Switching legacy network %s to custom mode VPC network", path) op, err := m.clients.Compute.SwitchToCustomMode(ctx, m.projectID, m.network.Name) if err != nil { return fmt.Errorf("error switching legacy network %s to custom mode VPC network: %w", path, err) } w := &ComputeOperation{ ProjectID: m.projectID, Operation: op, Client: m.clients.Compute, } if err := m.handler.Wait(ctx, w); err != nil { path := w.String() return fmt.Errorf("error waiting on Operation %s: %w", path, err) } log.Infof("Network %s switched to custom mode VPC network", path) resp, err := m.clients.Compute.GetNetwork(ctx, m.projectID, m.network.Name) if err != nil { return fmt.Errorf("unable to confirm network %s was converted: %w", path, err) } if resp.IPv4Range != "" { return fmt.Errorf("network %s was not converted; Network.IPv4Range (%s) should be empty", path, resp.IPv4Range) } return nil } func (m *networkMigrator) migrateClusters(ctx context.Context) error { log.Infof("Initiate upgrades for cluster(s) on network %q", m.network.Name) sem := make(chan struct{}, m.concurrentClusters) return migrate.Migrate(ctx, sem, m.children...) } func (m *networkMigrator) wait(ctx context.Context, opID string) error { op, err := m.clients.Compute.GetGlobalOperation(ctx, m.projectID, opID) if err != nil { return err } w := &ComputeOperation{ ProjectID: m.projectID, Operation: op, Client: m.clients.Compute, } if err := m.handler.Wait(ctx, w); err != nil { return fmt.Errorf("error waiting on ongoing operation %s: %w", w.String(), err) } return nil } type ComputeOperation struct { ProjectID string Operation *compute.Operation Client pkg.ComputeService } func (o *ComputeOperation) String() string { return pkg.PathRegex.FindString(o.Operation.SelfLink) } func (o *ComputeOperation) poll(ctx context.Context) (operations.OperationStatus, error) { path := o.String() log.Debugf("Waiting for %s", path) var status operations.OperationStatus resp, err := o.Client.WaitOperation(ctx, o.ProjectID, o.Operation) if err != nil { return status, err } status = operationStatus(resp) log.Debugf("Operation %s status: %#v", path, status) return status, nil } func (o *ComputeOperation) IsFinished(ctx context.Context) (bool, error) { return operations.IsFinished(ctx, o.poll) } // operationStatus converts the status of a compute.Operation to a generic OperationStatus. func operationStatus(op *compute.Operation) operations.OperationStatus { var errs string if op.Error != nil { var arr []string for _, e := range op.Error.Errors { arr = append(arr, e.Message) } errs = strings.Join(arr, "\n") } return operations.OperationStatus{ Status: op.Status, Error: errs, } }