step.go (301 lines of code) (raw):
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package daisy
import (
"context"
"fmt"
"reflect"
"strings"
"time"
)
type stepImpl interface {
// populate modifies the step type field values.
// populate should set defaults, extend GCE partial URLs to full partial
// URLs (partial URLs including the "projects/<project>" prefix), etc.
// This should not perform value validation.
// Returns any parsing errors.
populate(ctx context.Context, s *Step) DError
validate(ctx context.Context, s *Step) DError
run(ctx context.Context, s *Step) DError
}
// Step is a single daisy workflow step.
type Step struct {
name string
w *Workflow
//Timeout description
TimeoutDescription string `json:",omitempty"`
// Time to wait for this step to complete (default 10m).
// Must be parsable by https://golang.org/pkg/time/#ParseDuration.
Timeout string `json:",omitempty"`
timeout time.Duration
// Only one of the below fields should exist for each instance of Step.
AttachDisks *AttachDisks `json:",omitempty"`
DetachDisks *DetachDisks `json:",omitempty"`
CreateDisks *CreateDisks `json:",omitempty"`
CreateForwardingRules *CreateForwardingRules `json:",omitempty"`
CreateFirewallRules *CreateFirewallRules `json:",omitempty"`
CreateImages *CreateImages `json:",omitempty"`
CreateMachineImages *CreateMachineImages `json:",omitempty"`
CreateInstances *CreateInstances `json:",omitempty"`
CreateNetworks *CreateNetworks `json:",omitempty"`
CreateSnapshots *CreateSnapshots `json:",omitempty"`
CreateSubnetworks *CreateSubnetworks `json:",omitempty"`
CreateTargetInstances *CreateTargetInstances `json:",omitempty"`
CopyGCSObjects *CopyGCSObjects `json:",omitempty"`
ResizeDisks *ResizeDisks `json:",omitempty"`
StartInstances *StartInstances `json:",omitempty"`
StopInstances *StopInstances `json:",omitempty"`
DeleteResources *DeleteResources `json:",omitempty"`
DeprecateImages *DeprecateImages `json:",omitempty"`
IncludeWorkflow *IncludeWorkflow `json:",omitempty"`
SubWorkflow *SubWorkflow `json:",omitempty"`
Suspend *Suspend `json:",omitempty"`
Resume *Resume `json:",omitempty"`
WaitForInstancesSignal *WaitForInstancesSignal `json:",omitempty"`
WaitForAnyInstancesSignal *WaitForAnyInstancesSignal `json:",omitempty"`
WaitForAvailableQuotas *WaitForAvailableQuotas `json:",omitempty"`
UpdateInstancesMetadata *UpdateInstancesMetadata `json:",omitempty"`
// Used for unit tests.
testType stepImpl
}
// NewStep creates a Step with given name and timeout with the specified workflow.
// If timeout is less or equal to zero, defaultTimeout from the workflow will be used
func NewStep(name string, w *Workflow, timeout time.Duration) *Step {
if timeout <= 0 {
return &Step{name: name, w: w, Timeout: w.DefaultTimeout}
}
return &Step{name: name, w: w, timeout: timeout}
}
// NewStepDefaultTimeout creates a Step with given name using default timeout from the workflow
func NewStepDefaultTimeout(name string, w *Workflow) *Step {
return NewStep(name, w, 0)
}
func (s *Step) stepImpl() (stepImpl, DError) {
var result stepImpl
matchCount := 0
if s.AttachDisks != nil {
matchCount++
result = s.AttachDisks
}
if s.DetachDisks != nil {
matchCount++
result = s.DetachDisks
}
if s.CreateDisks != nil {
matchCount++
result = s.CreateDisks
}
if s.CreateForwardingRules != nil {
matchCount++
result = s.CreateForwardingRules
}
if s.CreateFirewallRules != nil {
matchCount++
result = s.CreateFirewallRules
}
if s.CreateImages != nil {
matchCount++
result = s.CreateImages
}
if s.CreateMachineImages != nil {
matchCount++
result = s.CreateMachineImages
}
if s.CreateInstances != nil {
matchCount++
result = s.CreateInstances
}
if s.CreateNetworks != nil {
matchCount++
result = s.CreateNetworks
}
if s.CreateSnapshots != nil {
matchCount++
result = s.CreateSnapshots
}
if s.CreateSubnetworks != nil {
matchCount++
result = s.CreateSubnetworks
}
if s.CreateTargetInstances != nil {
matchCount++
result = s.CreateTargetInstances
}
if s.CopyGCSObjects != nil {
matchCount++
result = s.CopyGCSObjects
}
if s.ResizeDisks != nil {
matchCount++
result = s.ResizeDisks
}
if s.StartInstances != nil {
matchCount++
result = s.StartInstances
}
if s.StopInstances != nil {
matchCount++
result = s.StopInstances
}
if s.DeleteResources != nil {
matchCount++
result = s.DeleteResources
}
if s.DeprecateImages != nil {
matchCount++
result = s.DeprecateImages
}
if s.IncludeWorkflow != nil {
matchCount++
result = s.IncludeWorkflow
}
if s.SubWorkflow != nil {
matchCount++
result = s.SubWorkflow
}
if s.WaitForInstancesSignal != nil {
matchCount++
result = s.WaitForInstancesSignal
}
if s.WaitForAnyInstancesSignal != nil {
matchCount++
result = s.WaitForAnyInstancesSignal
}
if s.WaitForAvailableQuotas != nil {
matchCount++
result = s.WaitForAvailableQuotas
}
if s.UpdateInstancesMetadata != nil {
matchCount++
result = s.UpdateInstancesMetadata
}
if s.testType != nil {
matchCount++
result = s.testType
}
if s.Resume != nil {
matchCount++
result = s.Resume
}
if s.Suspend != nil {
matchCount++
result = s.Suspend
}
if matchCount == 0 {
return nil, Errf("no step type defined")
}
if matchCount > 1 {
return nil, Errf("multiple step types defined")
}
return result, nil
}
func (s *Step) depends(other *Step) bool {
if s == nil || other == nil || s.w == nil || s.w != other.w {
return false
}
deps := s.w.Dependencies
steps := s.w.Steps
q := deps[s.name]
seen := map[string]bool{}
// Do a BFS search on s's dependencies, looking for the target dependency. Don't revisit visited dependencies.
for i := 0; i < len(q); i++ {
name := q[i]
if seen[name] {
continue
}
seen[name] = true
if steps[name] == other {
return true
}
for _, dep := range deps[name] {
q = append(q, dep)
}
}
return false
}
// nestedDepends determines if s depends on other, taking into account the recursive, nested nature of
// workflows, i.e. workflows in IncludeWorkflow and SubWorkflow.
// Example: if s depends on an IncludeWorkflow whose workflow contains other, then s depends on other.
func (s *Step) nestedDepends(other *Step) bool {
sChain := s.getChain()
oChain := other.getChain()
// If sChain and oChain don't share the same root workflow, then there is no dependency relationship.
if len(sChain) == 0 || len(oChain) == 0 || sChain[0].w != oChain[0].w {
return false
}
// Find where the step chains diverge.
// A divergence in the chains indicates sibling steps, where we can check dependency.
// We want to see if s's branch depends on other's branch.
var sStep, oStep *Step
for i := 0; i < minInt(len(sChain), len(oChain)); i++ {
sStep = sChain[i]
oStep = oChain[i]
if sStep != oStep {
break
}
}
return sStep.depends(oStep)
}
// getChain returns the step chain getting to a step. A link in the chain represents an IncludeWorkflow step, a
// SubWorkflow step, or the step itself.
// For example, workflow A has a step s1 which includes workflow B. B has a step s2 which subworkflows C. Finally,
// C has a step s3. s3.getChain() will return []*Step{s1, s2, s3}
func (s *Step) getChain() []*Step {
if s == nil || s.w == nil {
return nil
}
if s.w.parent == nil {
return []*Step{s}
}
for _, st := range s.w.parent.Steps {
if st.IncludeWorkflow != nil && st.IncludeWorkflow.Workflow == s.w {
return append(st.getChain(), s)
}
if st.SubWorkflow != nil && st.SubWorkflow.Workflow == s.w {
return append(st.getChain(), s)
}
}
// We shouldn't get here.
return nil
}
func (s *Step) populate(ctx context.Context) DError {
s.w.LogWorkflowInfo("Populating step %q", s.name)
impl, err := s.stepImpl()
if err != nil {
return s.wrapPopulateError(err)
}
if err = impl.populate(ctx, s); err != nil {
err = s.wrapPopulateError(err)
}
return err
}
func (s *Step) recordStepTime(startTime time.Time) {
endTime := time.Now()
s.w.recordStepTime(s.name, startTime, endTime)
}
func (s *Step) run(ctx context.Context) DError {
startTime := time.Now()
defer s.recordStepTime(startTime)
impl, err := s.stepImpl()
if err != nil {
return s.wrapRunError(err)
}
var st string
if t := reflect.TypeOf(impl); t.Kind() == reflect.Ptr {
st = t.Elem().Name()
} else {
st = t.Name()
}
s.w.LogWorkflowInfo("Running step %q (%s)", s.name, st)
if err = impl.run(ctx, s); err != nil {
return s.wrapRunError(err)
}
select {
case <-s.w.Cancel:
// return an error to indicate a canceled workflow is not 'success'
return s.w.onStepCancel(s, st)
default:
s.w.LogWorkflowInfo("Step %q (%s) successfully finished.", s.name, st)
}
return nil
}
func (s *Step) validate(ctx context.Context) DError {
s.w.LogWorkflowInfo("Validating step %q", s.name)
if !rfc1035Rgx.MatchString(strings.ToLower(s.name)) {
return s.wrapValidateError(Errf("step name must start with a letter and only contain letters, numbers, and hyphens"))
}
impl, err := s.stepImpl()
if err != nil {
return s.wrapValidateError(err)
}
if err = impl.validate(ctx, s); err != nil {
return s.wrapValidateError(err)
}
return nil
}
func (s *Step) wrapPopulateError(e DError) DError {
return wrapErrf(e, "step %q populate error", s.name)
}
func (s *Step) wrapRunError(e DError) DError {
return wrapErrf(e, "step %q run error", s.name)
}
func (s *Step) wrapValidateError(e DError) DError {
return wrapErrf(e, "step %q validation error", s.name)
}
func (s *Step) getTimeoutError() DError {
var timeoutDescription string
if s.TimeoutDescription != "" {
timeoutDescription = fmt.Sprintf(". %s", s.TimeoutDescription)
}
return Errf("step %q did not complete within the specified timeout of %s%s", s.name, s.timeout, timeoutDescription)
}