step_delete_resources.go (336 lines of code) (raw):

// Copyright 2017 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package daisy import ( "context" "fmt" "net/http" "strings" "sync" "cloud.google.com/go/storage" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" ) // DeleteResources deletes GCE/GCS resources. type DeleteResources struct { Disks []string `json:",omitempty"` Images []string `json:",omitempty"` MachineImages []string `json:",omitempty"` Instances []string `json:",omitempty"` Networks []string `json:",omitempty"` Subnetworks []string `json:",omitempty"` GCSPaths []string `json:",omitempty"` Firewalls []string `json:",omitempty"` } func (d *DeleteResources) populate(ctx context.Context, s *Step) DError { for i, disk := range d.Disks { if diskURLRgx.MatchString(disk) { d.Disks[i] = extendPartialURL(disk, s.w.Project) } } for i, image := range d.Images { if imageURLRgx.MatchString(image) { d.Images[i] = extendPartialURL(image, s.w.Project) } } for i, machineImage := range d.MachineImages { if machineImageURLRgx.MatchString(machineImage) { d.MachineImages[i] = extendPartialURL(machineImage, s.w.Project) } } for i, instance := range d.Instances { if instanceURLRgx.MatchString(instance) { d.Instances[i] = extendPartialURL(instance, s.w.Project) } } for i, network := range d.Networks { if networkURLRegex.MatchString(network) { d.Networks[i] = extendPartialURL(network, s.w.Project) } } for i, subnetwork := range d.Subnetworks { if subnetworkURLRegex.MatchString(subnetwork) { d.Subnetworks[i] = extendPartialURL(subnetwork, s.w.Project) } } for i, firewall := range d.Firewalls { if firewallRuleURLRegex.MatchString(firewall) { d.Firewalls[i] = extendPartialURL(firewall, s.w.Project) } } return nil } func (d *DeleteResources) validateInstance(i string, s *Step) DError { if err := s.w.instances.regDelete(i, s); err != nil { return err } ir, _ := s.w.instances.get(i) // Get the Instance that created this instance, if any. if ir.creator != nil { //Try GA for _, createI := range (*ir.creator.CreateInstances).Instances { if createI.daisyName != i { continue } attachedDisks := createI.Disks for _, ad := range attachedDisks { if !ad.AutoDelete { continue } dName := ad.Source if ad.InitializeParams != nil { dName = ad.InitializeParams.DiskName } if err := s.w.disks.regDelete(dName, s); err != nil { return err } return nil } } //Try Beta for _, createI := range (*ir.creator.CreateInstances).InstancesBeta { if createI.daisyName != i { continue } attachedDisks := createI.Disks for _, ad := range attachedDisks { if !ad.AutoDelete { continue } dName := ad.Source if ad.InitializeParams != nil { dName = ad.InitializeParams.DiskName } if err := s.w.disks.regDelete(dName, s); err != nil { return err } return nil } } } return nil } func (d *DeleteResources) checkError(err DError, s *Step) DError { if err != nil && strings.HasSuffix(err.etype(), resourceDNEError) { s.w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error validating deletion: %v", err) return nil } else if err != nil && err.etype() == imageObsoleteDeletedError { return nil } return err } func (d *DeleteResources) validate(ctx context.Context, s *Step) DError { // Instance checking. for _, i := range d.Instances { if err := d.validateInstance(i, s); d.checkError(err, s) != nil { return err } } // Disk checking. for _, disk := range d.Disks { if err := s.w.disks.regDelete(disk, s); d.checkError(err, s) != nil { return err } } // Image checking. for _, i := range d.Images { if err := s.w.images.regDelete(i, s); d.checkError(err, s) != nil { return err } } // Machine image checking. for _, i := range d.MachineImages { if err := s.w.machineImages.regDelete(i, s); d.checkError(err, s) != nil { return err } } // Network checking. for _, n := range d.Networks { if err := s.w.networks.regDelete(n, s); d.checkError(err, s) != nil { return err } } // Subnetwork checking. for _, sn := range d.Subnetworks { if err := s.w.subnetworks.regDelete(sn, s); d.checkError(err, s) != nil { return err } } // GCS path checking for _, p := range d.GCSPaths { bkt, _, err := splitGCSPath(p) if err != nil { return err } // Check if bucket exists and is writeable. writableBkts.mx.Lock() if !strIn(bkt, writableBkts.bkts) { if _, err := s.w.StorageClient.Bucket(bkt).Attrs(ctx); err != nil { return Errf("error reading bucket %q: %v", bkt, err) } tObj := s.w.StorageClient.Bucket(bkt).Object(fmt.Sprintf("daisy-validate-%s-%s", s.name, s.w.id)) w := tObj.NewWriter(ctx) if _, err := w.Write(nil); err != nil { return newErr("failed to write to GCS object when deleting resources", err) } if err := w.Close(); err != nil { return Errf("error writing to bucket %q: %v", bkt, err) } if err := tObj.Delete(ctx); err != nil { return Errf("error deleting file %+v after write validation: %v", tObj, err) } writableBkts.bkts = append(writableBkts.bkts, bkt) } writableBkts.mx.Unlock() } return nil } func recursiveGCSDelete(ctx context.Context, w *Workflow, bkt, prefix string) DError { it := w.StorageClient.Bucket(bkt).Objects(ctx, &storage.Query{Prefix: prefix}) for objAttr, err := it.Next(); err != iterator.Done; objAttr, err = it.Next() { if err != nil { return typedErr(apiError, "failed to iterate GCS object for deletion", err) } if objAttr.Size == 0 { continue } if err := w.StorageClient.Bucket(bkt).Object(objAttr.Name).Delete(ctx); err != nil { return typedErr(apiError, "failed to delete GCS object", err) } } return nil } // Waits for the whole group to run. Monitors for error and cancels. // Returns true if error should be raised, false otherwise. func waitGroup(wg *sync.WaitGroup, e chan DError, w *Workflow) (bool, DError) { go func() { wg.Wait() e <- nil }() select { case err := <-e: if err != nil { return true, err } case <-w.Cancel: return true, nil } return false, nil } func (d *DeleteResources) run(ctx context.Context, s *Step) DError { var wg sync.WaitGroup w := s.w e := make(chan DError) for _, i := range d.Instances { wg.Add(1) go func(i string) { defer wg.Done() w.LogStepInfo(s.name, "DeleteResources", "Deleting instance %q.", i) if err := w.instances.delete(i); err != nil { if err.etype() == resourceDNEError { w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error deleting instance %q: %v", i, err) return } e <- err } }(i) } for _, i := range d.Images { wg.Add(1) go func(i string) { defer wg.Done() w.LogStepInfo(s.name, "DeleteResources", "Deleting image %q.", i) if err := w.images.delete(i); err != nil { if err.etype() == resourceDNEError { w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error deleting image %q: %v", i, err) return } e <- err } }(i) } for _, i := range d.MachineImages { wg.Add(1) go func(i string) { defer wg.Done() w.LogStepInfo(s.name, "DeleteResources", "Deleting machine image %q.", i) if err := w.machineImages.delete(i); err != nil { if err.etype() == resourceDNEError { w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error deleting machine image %q: %v", i, err) return } e <- err } }(i) } for _, p := range d.GCSPaths { wg.Add(1) go func(p string) { defer wg.Done() bkt, obj, err := splitGCSPath(p) if err != nil { e <- err return } if obj == "" || strings.HasSuffix(obj, "/") { if err := recursiveGCSDelete(ctx, s.w, bkt, obj); err != nil { e <- err } return } if err := w.StorageClient.Bucket(bkt).Object(obj).Delete(ctx); err != nil { if gErr, ok := err.(*googleapi.Error); ok && gErr.Code == http.StatusNotFound { w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error deleting GCS Path %q: %v", p, err) return } e <- Errf("error deleting GCS path %q: %v", p, err) } }(p) } if abort, ret := waitGroup(&wg, e, w); abort { return ret } // Delete disks only after instances have been deleted. e = make(chan DError) for _, d := range d.Disks { wg.Add(1) go func(d string) { defer wg.Done() w.LogStepInfo(s.name, "DeleteResources", "Deleting disk %q.", d) if err := w.disks.delete(d); err != nil { if err.etype() == resourceDNEError { w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error deleting disk %q: %v", d, err) return } e <- err } }(d) } // Delete firewalls after instance have been deleted for _, n := range d.Firewalls { wg.Add(1) go func(n string) { defer wg.Done() w.LogStepInfo(s.name, "DeleteResources", "Deleting firewall %q.", n) if err := w.firewallRules.delete(n); err != nil { if err.etype() == resourceDNEError { w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error deleting firewall %q: %v", n, err) } e <- err } }(n) } // Delete subnetworks after firewalls. for _, sn := range d.Subnetworks { wg.Add(1) go func(sn string) { defer wg.Done() w.LogStepInfo(s.name, "DeleteResources", "Deleting subnetwork %q.", sn) if err := w.subnetworks.delete(sn); err != nil { if err.etype() == resourceDNEError { w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error deleting subnetwork %q: %v", sn, err) } e <- err } }(sn) } if abort, ret := waitGroup(&wg, e, w); abort { return ret } // Delete networks after subnetworks have been deleted for _, n := range d.Networks { wg.Add(1) go func(n string) { defer wg.Done() w.LogStepInfo(s.name, "DeleteResources", "Deleting network %q.", n) if err := w.networks.delete(n); err != nil { if err.etype() == resourceDNEError { w.LogStepInfo(s.name, "DeleteResources", "WARNING: Error deleting network %q: %v", n, err) } e <- err } }(n) } _, ret := waitGroup(&wg, e, w) return ret }