resource_registry.go (229 lines of code) (raw):
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package daisy
import (
"fmt"
"regexp"
"strings"
"sync"
)
type baseResourceRegistry struct {
w *Workflow
m map[string]*Resource
mx sync.Mutex
deleteFn func(res *Resource) DError
startFn func(res *Resource) DError
stopFn func(res *Resource) DError
typeName string
urlRgx *regexp.Regexp
}
func (r *baseResourceRegistry) init() {
r.m = map[string]*Resource{}
}
func (r *baseResourceRegistry) cleanup() {
var wg sync.WaitGroup
for name, res := range r.m {
if res.creator == nil || // placeholder resource
(res.creator != nil && !res.createdInWorkflow) || // resource isn‘t created successfully
(res.NoCleanup && !r.w.forceCleanup) || // resource is flagged to avoid cleanup
res.deleted { // resource has been deleted
continue
}
wg.Add(1)
go func(name string) {
defer wg.Done()
if err := r.delete(name); err != nil && err.etype() != resourceDNEError {
fmt.Println(err)
}
}(name)
}
wg.Wait()
}
func (r *baseResourceRegistry) delete(name string) DError {
res, ok := r.get(name)
if !ok {
return Errf("cannot delete %s %q; does not exist in registry", r.typeName, name)
}
// TODO: find a better way for resource delete locking.
// - move deleteMx out of Resource, it probably belongs in the registry.
r.mx.Lock()
if res.deleteMx == nil {
res.deleteMx = &sync.Mutex{}
}
r.mx.Unlock()
res.deleteMx.Lock()
defer res.deleteMx.Unlock()
if res.deleted {
return Errf("cannot delete %q; already deleted", name)
}
if err := r.deleteFn(res); err != nil {
return err
}
res.deleted = true
return nil
}
func (r *baseResourceRegistry) start(name string) DError {
res, ok := r.get(name)
if !ok {
return Errf("cannot start %s %q; does not exist in registry", r.typeName, name)
}
if res.startedByWf {
return Errf("cannot start %q; already started", name)
}
if err := r.startFn(res); err != nil {
return err
}
res.stoppedByWf = false
res.startedByWf = true
return nil
}
func (r *baseResourceRegistry) stop(name string) DError {
res, ok := r.get(name)
if !ok {
return Errf("cannot stop %s %q; does not exist in registry", r.typeName, name)
}
if res.stoppedByWf {
return Errf("cannot stop %q; already stopped", name)
}
if err := r.stopFn(res); err != nil {
return err
}
res.startedByWf = false
res.stoppedByWf = true
return nil
}
func (r *baseResourceRegistry) get(name string) (*Resource, bool) {
r.mx.Lock()
defer r.mx.Unlock()
res, ok := r.m[name]
return res, ok
}
// regCreate registers a Step s as the creator of a resource, res, and identifies the resource by name.
func (r *baseResourceRegistry) regCreate(name string, res *Resource, s *Step, overWrite bool) DError {
// Check:
// - no duplicates known by name
r.mx.Lock()
defer r.mx.Unlock()
if res, ok := r.m[name]; ok {
return Errf("cannot create %s %q; already created by step %q", r.typeName, name, res.creator.name)
}
if !overWrite {
if exists, err := r.w.resourceExists(res.link); err != nil {
return Errf("cannot create %s %q; resource lookup error: %v", r.typeName, name, err)
} else if exists {
return Errf("cannot create %s %q; resource already exists", r.typeName, name)
}
}
res.creator = s
r.m[name] = res
return nil
}
// regDelete registers a Step s as the deleter of a resource.
// The name argument can be a Daisy internal name, or a fully qualified resource URL, e.g. projects/p/global/images/i.
func (r *baseResourceRegistry) regDelete(name string, s *Step) DError {
// Check:
// - don't dupe deletion of name.
// - s depends on ALL registered users and creator of name.
r.mx.Lock()
defer r.mx.Unlock()
var ok bool
var res *Resource
if r.urlRgx != nil && r.urlRgx.MatchString(name) {
var err DError
res, err = r.regURL(name, true)
if err != nil {
return err
}
} else if res, ok = r.m[name]; !ok {
return Errf("missing reference for %s %q", r.typeName, name)
}
if res.deleter != nil {
return Errf("cannot delete %s %q: already deleted by step %q", r.typeName, name, res.deleter.name)
}
us := res.users
if res.creator != nil {
us = append(us, res.creator)
}
for _, u := range us {
if !s.nestedDepends(u) {
return Errf("deleting %s %q MUST transitively depend on step %q which references %q", r.typeName, name, u.name, name)
}
}
res.deleter = s
return nil
}
// regURL creates a placeholder registry entry for a resource identified by a fully qualified resource URL, e.g.
// projects/p/global/images/i.
// A placeholder resource will be created in the registry. The resource will have no creator and will not auto-cleanup.
// The placeholder resource will be identified within the registry by its fully qualified resource URL.
func (r *baseResourceRegistry) regURL(url string, checkExist bool) (*Resource, DError) {
if !strings.HasPrefix(url, "projects/") {
return nil, Errf("partial GCE resource URL %q needs leading \"projects/PROJECT/\"", url)
}
if r, ok := r.m[url]; ok {
return r, nil
}
if checkExist {
exists, err := r.w.resourceExists(url)
if !exists {
if err != nil {
return nil, err
}
return nil, typedErrf(r.typeName+resourceDNEError, "%s does not exist", url)
}
}
parts := strings.Split(url, "/")
res := &Resource{RealName: parts[len(parts)-1], link: url, NoCleanup: true}
r.m[url] = res
return res, nil
}
// regUse registers a Step s as a user of a resource.
// The name argument can be a Daisy internal name, or a fully qualified resource URL, e.g. projects/p/global/images/i.
func (r *baseResourceRegistry) regUse(name string, s *Step) (*Resource, DError) {
// Check:
// - s depends on creator of name, if there is a creator.
// - name doesn't have a registered deleter yet, usage must occur before deletion.
r.mx.Lock()
defer r.mx.Unlock()
var ok bool
var res *Resource
if r.urlRgx != nil && r.urlRgx.MatchString(name) {
var err DError
res, err = r.regURL(name, true)
if err != nil {
return nil, err
}
} else if res, ok = r.m[name]; !ok {
return nil, Errf("missing reference for %s %q", r.typeName, name)
}
if res.creator != nil && !s.nestedDepends(res.creator) {
return nil, Errf("using %s %q MUST transitively depend on step %q which creates %q", r.typeName, name, res.creator.name, name)
}
if res.deleter != nil {
return nil, Errf("using %s %q; step %q deletes %q and MUST transitively depend on this step", r.typeName, name, res.deleter.name, name)
}
r.m[name].users = append(r.m[name].users, s)
return res, nil
}
// regUseDeviceName registers a Step s as a user of a disk device resource.
// "DeviceName" is only used by DetachDisks API.
// "daisyInstanceName" represents the name in daisy workflow definition, which is a shorter name
func (dr *diskRegistry) regUseDeviceName(deviceName, project, zone, instance, daisyInstanceName string, s *Step) (*Resource, bool, DError) {
// Check:
// deviceName either has a creator/attacher, or has been attached before the workflow's execution
// - s depends on creator of deviceName, if there is a creator.
// - deviceName doesn't have a registered deleter yet, usage must occur before deletion.
dr.mx.Lock()
defer dr.mx.Unlock()
var isAttached bool
var res *Resource
var err DError
if deviceNameURLRgx.MatchString(deviceName) {
// check whether it's attached before the workflow's execution
isAttached, err = isDiskAttached(dr.w.ComputeClient, deviceName, project, zone, instance)
if err != nil {
return nil, isAttached, err
}
if !isAttached {
return nil, isAttached, Errf("device name '%v' is not attached", deviceName)
}
res, err = dr.regURL(deviceName, false)
if err != nil {
return nil, isAttached, err
}
} else if strings.Contains(deviceName, "/") {
return nil, isAttached, Errf("unexpected url for %s: %q", dr.typeName, deviceName)
} else if res, err = dr.findDiskResourceByDeviceName(deviceName, daisyInstanceName); err != nil {
return nil, isAttached, err
}
if res.creator != nil && !s.nestedDepends(res.creator) {
return nil, isAttached, Errf("using %s %q MUST transitively depend on step %q which creates %q", dr.typeName, deviceName, res.creator.name, deviceName)
}
if res.deleter != nil {
return nil, isAttached, Errf("using %s %q; step %q deletes %q and MUST transitively depend on this step", dr.typeName, deviceName, res.deleter.name, deviceName)
}
res.users = append(res.users, s)
return res, isAttached, nil
}
func (dr *diskRegistry) findDiskResourceByDeviceName(deviceName, instance string) (*Resource, DError) {
attachmentMap, ok := dr.attachments[deviceName]
if !ok {
return nil, Errf("missing registered disk attachment for device name '%v'", deviceName)
}
attachment, ok := attachmentMap[instance]
if !ok {
return nil, Errf("missing registered disk attachment for instance '%v'", instance)
}
return dr.m[attachment.diskName], nil
}