workflow.go (724 lines of code) (raw):
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package daisy describes a daisy workflow.
package daisy
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/logging"
"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/compute-daisy/compute"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
const defaultTimeout = "10m"
func daisyBkt(ctx context.Context, client *storage.Client, project string) (string, DError) {
dBkt := strings.Replace(project, ":", "-", -1) + "-daisy-bkt"
it := client.Buckets(ctx, project)
for bucketAttrs, err := it.Next(); err != iterator.Done; bucketAttrs, err = it.Next() {
if err != nil {
return "", typedErr(apiError, "failed to iterate buckets", err)
}
if bucketAttrs.Name == dBkt {
return dBkt, nil
}
}
if err := client.Bucket(dBkt).Create(ctx, project, nil); err != nil {
return "", typedErr(apiError, "failed to create bucket", err)
}
return dBkt, nil
}
// TimeRecord is a type with info of a step execution time
type TimeRecord struct {
Name string
StartTime time.Time
EndTime time.Time
}
// Var is a type with a flexible JSON representation. A Var can be represented
// by either a string, or by this struct definition. A Var that is represented
// by a string will unmarshal into the struct: {Value: <string>, Required: false, Description: ""}.
type Var struct {
Value string
Required bool `json:",omitempty"`
Description string `json:",omitempty"`
}
// UnmarshalJSON unmarshals a Var.
func (v *Var) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err == nil {
v.Value = s
return nil
}
// We can't unmarshal into Var directly as it would create an infinite loop.
type aVar Var
return json.Unmarshal(b, &struct{ *aVar }{aVar: (*aVar)(v)})
}
// Workflow is a single Daisy workflow workflow.
type Workflow struct {
// Populated on New() construction.
Cancel chan struct{} `json:"-"`
isCanceled bool
cancelMx sync.Mutex
// Workflow template fields.
// Workflow name.
Name string `json:",omitempty"`
// Project to run in.
Project string `json:",omitempty"`
// Zone to run in.
Zone string `json:",omitempty"`
// GCS Path to use for scratch data and write logs/results to.
GCSPath string `json:",omitempty"`
// Path to OAuth credentials file.
OAuthPath string `json:",omitempty"`
// Sources used by this workflow, map of destination to source.
Sources map[string]string `json:",omitempty"`
// Vars defines workflow variables, substitution is done at Workflow run time.
Vars map[string]Var `json:",omitempty"`
Steps map[string]*Step `json:",omitempty"`
// Map of steps to their dependencies.
Dependencies map[string][]string `json:",omitempty"`
// Default timout for each step, defaults to 10m.
// Must be parsable by https://golang.org/pkg/time/#ParseDuration.
DefaultTimeout string `json:",omitempty"`
defaultTimeout time.Duration
// Working fields.
autovars map[string]string
workflowDir string
parent *Workflow
bucket string
scratchPath string
sourcesPath string
logsPath string
outsPath string
username string
externalLogging bool
gcsLoggingDisabled bool
cloudLoggingDisabled bool
stdoutLoggingDisabled bool
disableCachingImages bool
id string
Logger Logger `json:"-"`
cleanupHooks []func() DError
cleanupHooksMx sync.Mutex
recordTimeMx sync.Mutex
stepWait sync.WaitGroup
logProcessHook func(string) string
// Optional compute endpoint override.stepWait
ComputeEndpoint string `json:",omitempty"`
ComputeClient compute.Client `json:"-"`
StorageClient *storage.Client `json:"-"`
CloudLoggingClient *logging.Client `json:"-"`
// Resource registries.
disks *diskRegistry
forwardingRules *forwardingRuleRegistry
firewallRules *firewallRuleRegistry
images *imageRegistry
machineImages *machineImageRegistry
instances *instanceRegistry
networks *networkRegistry
subnetworks *subnetworkRegistry
targetInstances *targetInstanceRegistry
objects *objectRegistry
snapshots *snapshotRegistry
// Cache of resources
machineTypeCache twoDResourceCache
instanceCache twoDResourceCache
diskCache twoDResourceCache
subnetworkCache twoDResourceCache
targetInstanceCache twoDResourceCache
forwardingRuleCache twoDResourceCache
imageCache oneDResourceCache
imageFamilyCache oneDResourceCache
machineImageCache oneDResourceCache
networkCache oneDResourceCache
firewallRuleCache oneDResourceCache
zonesCache oneDResourceCache
regionsCache oneDResourceCache
licenseCache oneDResourceCache
snapshotCache oneDResourceCache
stepTimeRecords []TimeRecord
serialControlOutputValues map[string]string
serialControlOutputValuesMx sync.Mutex
//Forces cleanup on error of all resources, including those marked with NoCleanup
ForceCleanupOnError bool
// forceCleanup is set to true when resources should be forced clean, even when NoCleanup is set to true
forceCleanup bool
// cancelReason provides custom reason when workflow is canceled. f
cancelReason string
}
// DisableCloudLogging disables logging to Cloud Logging for this workflow.
func (w *Workflow) DisableCloudLogging() {
w.cloudLoggingDisabled = true
}
// DisableGCSLogging disables logging to GCS for this workflow.
func (w *Workflow) DisableGCSLogging() {
w.gcsLoggingDisabled = true
}
// DisableStdoutLogging disables logging to stdout for this workflow.
func (w *Workflow) DisableStdoutLogging() {
w.stdoutLoggingDisabled = true
}
// SkipCachingImages skips caching images for this workflow.
func (w *Workflow) SkipCachingImages() {
w.disableCachingImages = true
}
// AddVar adds a variable set to the Workflow.
func (w *Workflow) AddVar(k, v string) {
if w.Vars == nil {
w.Vars = map[string]Var{}
}
w.Vars[k] = Var{Value: v}
}
// AddSerialConsoleOutputValue adds an serial-output key-value pair to the Workflow.
func (w *Workflow) AddSerialConsoleOutputValue(k, v string) {
w.serialControlOutputValuesMx.Lock()
if w.serialControlOutputValues == nil {
w.serialControlOutputValues = map[string]string{}
}
w.serialControlOutputValues[k] = v
w.serialControlOutputValuesMx.Unlock()
}
// GetSerialConsoleOutputValue gets an serial-output value by key.
func (w *Workflow) GetSerialConsoleOutputValue(k string) string {
return w.serialControlOutputValues[k]
}
func (w *Workflow) addCleanupHook(hook func() DError) {
w.cleanupHooksMx.Lock()
w.cleanupHooks = append(w.cleanupHooks, hook)
w.cleanupHooksMx.Unlock()
}
// SetLogProcessHook sets a hook function to process log string
func (w *Workflow) SetLogProcessHook(hook func(string) string) {
w.logProcessHook = hook
}
// Validate runs validation on the workflow.
func (w *Workflow) Validate(ctx context.Context) DError {
if err := w.PopulateClients(ctx); err != nil {
w.CancelWorkflow()
return Errf("error populating workflow: %v", err)
}
if err := w.validateRequiredFields(); err != nil {
w.CancelWorkflow()
return Errf("error validating workflow: %v", err)
}
if err := w.populate(ctx); err != nil {
w.CancelWorkflow()
return Errf("error populating workflow: %v", err)
}
w.LogWorkflowInfo("Validating workflow")
if err := w.validate(ctx); err != nil {
w.LogWorkflowInfo("Error validating workflow: %v", err)
w.CancelWorkflow()
return err
}
w.LogWorkflowInfo("Validation Complete")
return nil
}
// WorkflowModifier is a function type for functions that can modify a Workflow object.
//
// Deprecated: This will be removed in a future release.
type WorkflowModifier func(*Workflow)
// Run runs a workflow.
func (w *Workflow) Run(ctx context.Context) (err DError) {
w.externalLogging = true
if err = w.Validate(ctx); err != nil {
return err
}
defer w.cleanup()
defer func() {
if err != nil {
w.forceCleanup = w.ForceCleanupOnError
}
}()
if os.Getenv("BUILD_ID") != "" {
w.LogWorkflowInfo("Cloud Build ID: %s", os.Getenv("BUILD_ID"))
}
w.LogWorkflowInfo("Workflow Project: %s", w.Project)
w.LogWorkflowInfo("Workflow Zone: %s", w.Zone)
w.LogWorkflowInfo("Workflow GCSPath: %s", w.GCSPath)
w.LogWorkflowInfo("Daisy scratch path: https://console.cloud.google.com/storage/browser/%s", path.Join(w.bucket, w.scratchPath))
w.LogWorkflowInfo("Uploading sources")
if err = w.uploadSources(ctx); err != nil {
w.LogWorkflowInfo("Error uploading sources: %v", err)
w.CancelWorkflow()
return err
}
w.LogWorkflowInfo("Running workflow")
defer func() {
for k, v := range w.serialControlOutputValues {
w.LogWorkflowInfo("Serial-output value -> %v:%v", k, v)
}
}()
if err = w.run(ctx); err != nil {
w.LogWorkflowInfo("Error running workflow: %v", err)
return err
}
return nil
}
func (w *Workflow) recordStepTime(stepName string, startTime time.Time, endTime time.Time) {
if w.parent == nil {
w.recordTimeMx.Lock()
w.stepTimeRecords = append(w.stepTimeRecords, TimeRecord{stepName, startTime, endTime})
w.recordTimeMx.Unlock()
} else {
w.parent.recordStepTime(fmt.Sprintf("%s.%s", w.Name, stepName), startTime, endTime)
}
}
// GetStepTimeRecords returns time records of each steps
func (w *Workflow) GetStepTimeRecords() []TimeRecord {
return w.stepTimeRecords
}
func (w *Workflow) cleanup() {
startTime := time.Now()
w.LogWorkflowInfo("Workflow %q cleaning up (this may take up to 2 minutes).", w.Name)
select {
case <-w.Cancel:
default:
w.CancelWorkflow()
}
// Allow goroutines that are watching w.Cancel an opportunity
// to detect that the workflow was cancelled and to cleanup.
c := make(chan struct{})
go func() {
w.stepWait.Wait()
close(c)
}()
select {
case <-c:
case <-time.After(4 * time.Second):
}
for _, hook := range w.cleanupHooks {
if err := hook(); err != nil {
w.LogWorkflowInfo("Error returned from cleanup hook: %s", err)
}
}
w.LogWorkflowInfo("Workflow %q finished cleanup.", w.Name)
w.recordStepTime("workflow cleanup", startTime, time.Now())
}
func (w *Workflow) genName(n string) string {
name := w.Name
for parent := w.parent; parent != nil; parent = parent.parent {
name = parent.Name + "-" + name
}
prefix := name
if n != "" {
prefix = fmt.Sprintf("%s-%s", n, name)
}
if len(prefix) > 57 {
prefix = prefix[0:56]
}
result := fmt.Sprintf("%s-%s", prefix, w.id)
if len(result) > 64 {
result = result[0:63]
}
return strings.ToLower(result)
}
func (w *Workflow) getSourceGCSAPIPath(s string) string {
return fmt.Sprintf("%s/%s", gcsAPIBase, path.Join(w.bucket, w.sourcesPath, s))
}
// PopulateClients populates the compute and storage clients for the workflow.
func (w *Workflow) PopulateClients(ctx context.Context, options ...option.ClientOption) error {
// API clients instantiation.
var (
err error
computeOptions []option.ClientOption
storageOptions []option.ClientOption
loggingOptions []option.ClientOption
)
if len(options) > 0 {
computeOptions = options
storageOptions = options
loggingOptions = options
} else {
computeOptions = []option.ClientOption{option.WithCredentialsFile(w.OAuthPath)}
storageOptions = []option.ClientOption{option.WithCredentialsFile(w.OAuthPath)}
loggingOptions = []option.ClientOption{option.WithCredentialsFile(w.OAuthPath)}
}
if w.ComputeEndpoint != "" {
computeOptions = append(computeOptions, option.WithEndpoint(w.ComputeEndpoint))
}
if w.ComputeClient == nil {
w.ComputeClient, err = compute.NewClient(ctx, computeOptions...)
if err != nil {
return typedErr(apiError, "failed to create compute client", err)
}
}
if w.StorageClient == nil {
w.StorageClient, err = storage.NewClient(ctx, storageOptions...)
if err != nil {
return err
}
}
if w.externalLogging && !w.cloudLoggingDisabled && w.CloudLoggingClient == nil {
w.CloudLoggingClient, err = logging.NewClient(ctx, w.Project, loggingOptions...)
if err != nil {
return err
}
}
return nil
}
func (w *Workflow) populateStep(ctx context.Context, s *Step) DError {
if s.Timeout == "" {
s.Timeout = w.DefaultTimeout
}
timeout, err := time.ParseDuration(s.Timeout)
if err != nil {
return newErr(fmt.Sprintf("failed to parse duration for workflow %v, step %v", w.Name, s.name), err)
}
s.timeout = timeout
var derr DError
var step stepImpl
if step, derr = s.stepImpl(); derr != nil {
return derr
}
return step.populate(ctx, s)
}
// populate does the following:
// - checks that all required Vars are set.
// - instantiates API clients, if needed.
// - sets generic autovars and do first round of var substitution.
// - sets GCS path information.
// - generates autovars from workflow fields (Name, Zone, etc) and run second round of var substitution.
// - sets up logger.
// - runs populate on each step.
func (w *Workflow) populate(ctx context.Context) DError {
for k, v := range w.Vars {
if v.Required && v.Value == "" {
return Errf("cannot populate workflow, required var %q is unset", k)
}
}
// Set some generic autovars and run first round of var substitution.
cwd, _ := os.Getwd()
now := time.Now().UTC()
w.username = getUser()
w.autovars = map[string]string{
"ID": w.id,
"DATE": now.Format("20060102"),
"DATETIME": now.Format("20060102150405"),
"TIMESTAMP": strconv.FormatInt(now.Unix(), 10),
"USERNAME": w.username,
"WFDIR": w.workflowDir,
"CWD": cwd,
}
var replacements []string
for k, v := range w.autovars {
replacements = append(replacements, fmt.Sprintf("${%s}", k), v)
}
for k, v := range w.Vars {
replacements = append(replacements, fmt.Sprintf("${%s}", k), v.Value)
}
substitute(reflect.ValueOf(w).Elem(), strings.NewReplacer(replacements...))
// Parse timeout.
timeout, err := time.ParseDuration(w.DefaultTimeout)
if err != nil {
return Errf("failed to parse timeout for workflow: %v", err)
}
w.defaultTimeout = timeout
// Set up GCS paths.
if w.GCSPath == "" {
dBkt, err := daisyBkt(ctx, w.StorageClient, w.Project)
if err != nil {
return err
}
w.GCSPath = "gs://" + dBkt
}
bkt, p, derr := splitGCSPath(w.GCSPath)
if derr != nil {
return derr
}
w.bucket = bkt
w.scratchPath = path.Join(p, fmt.Sprintf("daisy-%s-%s-%s", w.Name, now.Format("20060102-15:04:05"), w.id))
w.sourcesPath = path.Join(w.scratchPath, "sources")
w.logsPath = path.Join(w.scratchPath, "logs")
w.outsPath = path.Join(w.scratchPath, "outs")
// Generate more autovars from workflow fields. Run second round of var substitution.
w.autovars["NAME"] = w.Name
w.autovars["FULLNAME"] = w.genName("")
w.autovars["ZONE"] = w.Zone
w.autovars["PROJECT"] = w.Project
w.autovars["GCSPATH"] = w.GCSPath
w.autovars["SCRATCHPATH"] = fmt.Sprintf("gs://%s/%s", w.bucket, w.scratchPath)
w.autovars["SOURCESPATH"] = fmt.Sprintf("gs://%s/%s", w.bucket, w.sourcesPath)
w.autovars["LOGSPATH"] = fmt.Sprintf("gs://%s/%s", w.bucket, w.logsPath)
w.autovars["OUTSPATH"] = fmt.Sprintf("gs://%s/%s", w.bucket, w.outsPath)
replacements = []string{}
for k, v := range w.autovars {
replacements = append(replacements, fmt.Sprintf("${%s}", k), v)
}
substitute(reflect.ValueOf(w).Elem(), strings.NewReplacer(replacements...))
if w.Logger == nil {
w.createLogger(ctx)
}
// Run populate on each step.
for name, s := range w.Steps {
s.name = name
s.w = w
if err := w.populateStep(ctx, s); err != nil {
return Errf("error populating step %q: %v", name, err)
}
}
// We do this here, and not in validate, as embedded startup scripts could
// have what we think are daisy variables.
if err := w.validateVarsSubbed(); err != nil {
return err
}
if err := w.substituteSourceVars(ctx, reflect.ValueOf(w).Elem()); err != nil {
return err
}
return nil
}
// AddDependency creates a dependency of dependent on each dependency. Returns an
// error if dependent or dependency are not steps in this workflow.
func (w *Workflow) AddDependency(dependent *Step, dependencies ...*Step) error {
if _, ok := w.Steps[dependent.name]; !ok {
return fmt.Errorf("can't create dependency: step %q does not exist", dependent.name)
}
if w.Dependencies == nil {
w.Dependencies = map[string][]string{}
}
for _, dependency := range dependencies {
if _, ok := w.Steps[dependency.name]; !ok {
return fmt.Errorf("can't create dependency: step %q does not exist", dependency.name)
}
if !strIn(dependency.name, w.Dependencies[dependent.name]) { // Don't add if dependency already exists.
w.Dependencies[dependent.name] = append(w.Dependencies[dependent.name], dependency.name)
}
}
return nil
}
func (w *Workflow) includeWorkflow(iw *Workflow) {
iw.Cancel = w.Cancel
iw.parent = w
iw.disks = w.disks
iw.forwardingRules = w.forwardingRules
iw.firewallRules = w.firewallRules
iw.images = w.images
iw.machineImages = w.machineImages
iw.instances = w.instances
iw.networks = w.networks
iw.subnetworks = w.subnetworks
iw.targetInstances = w.targetInstances
iw.snapshots = w.snapshots
iw.objects = w.objects
}
// ID is the unique identifyier for this Workflow.
func (w *Workflow) ID() string {
return w.id
}
// NewIncludedWorkflowFromFile reads and unmarshals a workflow with the same resources as the parent.
func (w *Workflow) NewIncludedWorkflowFromFile(file string) (*Workflow, DError) {
iw := New()
w.includeWorkflow(iw)
if !filepath.IsAbs(file) {
file = filepath.Join(w.workflowDir, file)
}
if err := readWorkflow(file, iw); err != nil {
return nil, err
}
return iw, nil
}
// NewStep instantiates a new, typeless step for this workflow.
// The step type must be specified before running this workflow.
func (w *Workflow) NewStep(name string) (*Step, error) {
if _, ok := w.Steps[name]; ok {
return nil, fmt.Errorf("can't create step %q: a step already exists with that name", name)
}
s := &Step{name: name, w: w}
if w.Steps == nil {
w.Steps = map[string]*Step{}
}
w.Steps[name] = s
return s, nil
}
// NewSubWorkflow instantiates a new workflow as a child to this workflow.
func (w *Workflow) NewSubWorkflow() *Workflow {
sw := New()
sw.Cancel = w.Cancel
sw.parent = w
return sw
}
// NewSubWorkflowFromFile reads and unmarshals a workflow as a child to this workflow.
func (w *Workflow) NewSubWorkflowFromFile(file string) (*Workflow, DError) {
sw := w.NewSubWorkflow()
if !filepath.IsAbs(file) {
file = filepath.Join(w.workflowDir, file)
}
if err := readWorkflow(file, sw); err != nil {
return nil, err
}
return sw, nil
}
// Print populates then pretty prints the workflow.
func (w *Workflow) Print(ctx context.Context) {
w.externalLogging = false
if err := w.PopulateClients(ctx); err != nil {
fmt.Println("Error running PopulateClients:", err)
}
if err := w.populate(ctx); err != nil {
fmt.Println("Error running populate:", err)
}
b, err := json.MarshalIndent(w, "", " ")
if err != nil {
fmt.Println("Error marshalling workflow for printing:", err)
}
fmt.Println(string(b))
}
func (w *Workflow) run(ctx context.Context) DError {
return w.traverseDAG(func(s *Step) DError {
return w.runStep(ctx, s)
})
}
func (w *Workflow) runStep(ctx context.Context, s *Step) DError {
timeout := make(chan struct{})
go func() {
time.Sleep(s.timeout)
close(timeout)
}()
e := make(chan DError)
go func() {
e <- s.run(ctx)
}()
select {
case err := <-e:
return err
case <-timeout:
return s.getTimeoutError()
}
}
// Concurrently traverse the DAG, running func f on each step.
// Return an error if f returns an error on any step.
func (w *Workflow) traverseDAG(f func(*Step) DError) DError {
// waiting = steps and the dependencies they are waiting for.
// running = the currently running steps.
// start = map of steps' start channels/semaphores.
// done = map of steps' done channels for signaling step completion.
waiting := map[string][]string{}
var running []string
start := map[string]chan DError{}
done := map[string]chan DError{}
// Setup: channels, copy dependencies.
for name := range w.Steps {
waiting[name] = w.Dependencies[name]
start[name] = make(chan DError)
done[name] = make(chan DError)
}
// Setup: goroutine for each step. Each waits to be notified to start.
for name, s := range w.Steps {
go func(name string, s *Step) {
// Wait for signal, then run the function. Return any errs.
if err := <-start[name]; err != nil {
done[name] <- err
} else if err := f(s); err != nil {
done[name] <- err
}
close(done[name])
}(name, s)
}
// Main signaling logic.
for len(waiting) != 0 || len(running) != 0 {
// If we got a Cancel signal, kill all waiting steps.
// Let running steps finish.
select {
case <-w.Cancel:
waiting = map[string][]string{}
default:
}
// Kick off all steps that aren't waiting for anything.
for name, deps := range waiting {
if len(deps) == 0 {
delete(waiting, name)
running = append(running, name)
close(start[name])
}
}
// Sanity check. There should be at least one running step,
// but loop back through if there isn't.
if len(running) == 0 {
continue
}
// Get next finished step. Return the step error if it erred.
finished, err := stepsListen(running, done)
if err != nil {
return err
}
// Remove finished step from other steps' waiting lists.
for name, deps := range waiting {
waiting[name] = filter(deps, finished)
}
// Remove finished from currently running list.
running = filter(running, finished)
}
return nil
}
// New instantiates a new workflow.
func New() *Workflow {
// We can't use context.WithCancel as we use the context even after cancel for cleanup.
w := &Workflow{Cancel: make(chan struct{})}
// Init nil'ed fields
w.Sources = map[string]string{}
w.Vars = map[string]Var{}
w.Steps = map[string]*Step{}
w.Dependencies = map[string][]string{}
w.DefaultTimeout = defaultTimeout
w.autovars = map[string]string{}
// Resource registries and cleanup.
w.disks = newDiskRegistry(w)
w.forwardingRules = newForwardingRuleRegistry(w)
w.firewallRules = newFirewallRuleRegistry(w)
w.images = newImageRegistry(w)
w.machineImages = newMachineImageRegistry(w)
w.instances = newInstanceRegistry(w)
w.networks = newNetworkRegistry(w)
w.subnetworks = newSubnetworkRegistry(w)
w.objects = newObjectRegistry(w)
w.targetInstances = newTargetInstanceRegistry(w)
w.snapshots = newSnapshotRegistry(w)
w.addCleanupHook(func() DError {
w.instances.cleanup() // instances need to be done before disks/networks
w.images.cleanup()
w.machineImages.cleanup()
w.disks.cleanup()
w.forwardingRules.cleanup()
w.targetInstances.cleanup()
w.firewallRules.cleanup()
w.subnetworks.cleanup()
w.networks.cleanup()
w.snapshots.cleanup()
return nil
})
w.id = randString(5)
return w
}
// NewFromFile reads and unmarshals a workflow file.
// Recursively reads sub and included steps as well,
// when the filenames for those workflows do not contain
// a variable. If they contain a variable, they will be
// read during their populate step.
func NewFromFile(file string) (w *Workflow, err error) {
w = New()
if err := readWorkflow(file, w); err != nil {
return nil, err
}
return w, nil
}
// JSONError turns an error from json.Unmarshal and returns a more user
// friendly error.
func JSONError(file string, data []byte, err error) error {
// If this is a syntax error return a useful error.
sErr, ok := err.(*json.SyntaxError)
if !ok {
return err
}
// Byte number where the error line starts.
start := bytes.LastIndex(data[:sErr.Offset], []byte("\n")) + 1
// Assume end byte of error line is EOF unless this isn't the last line.
end := len(data)
if i := bytes.Index(data[start:], []byte("\n")); i >= 0 {
end = start + i
}
// Line number of error.
line := bytes.Count(data[:start], []byte("\n")) + 1
// Position of error in line (where to place the '^').
pos := int(sErr.Offset) - start
if pos != 0 {
pos = pos - 1
}
return fmt.Errorf("%s: JSON syntax error in line %d: %s \n%s\n%s^", file, line, err, data[start:end], strings.Repeat(" ", pos))
}
func readWorkflow(file string, w *Workflow) (derr DError) {
data, err := ioutil.ReadFile(file)
if err != nil {
return newErr("failed to read workflow file", err)
}
w.workflowDir, err = filepath.Abs(filepath.Dir(file))
if err != nil {
return newErr("failed to get absolute path of workflow file", err)
}
if err := json.Unmarshal(data, &w); err != nil {
return newErr("failed to unmarshal workflow file", JSONError(file, data, err))
}
if w.OAuthPath != "" && !filepath.IsAbs(w.OAuthPath) {
w.OAuthPath = filepath.Join(w.workflowDir, w.OAuthPath)
}
for name, step := range w.Steps {
step.name = name
step.w = w
if step.SubWorkflow != nil &&
step.SubWorkflow.Path != "" &&
!hasVariableDeclaration(step.SubWorkflow.Path) {
step.SubWorkflow.Workflow, derr = w.NewSubWorkflowFromFile(step.SubWorkflow.Path)
} else if step.IncludeWorkflow != nil &&
step.IncludeWorkflow.Path != "" &&
!hasVariableDeclaration(step.IncludeWorkflow.Path) {
step.IncludeWorkflow.Workflow, derr = w.NewIncludedWorkflowFromFile(step.IncludeWorkflow.Path)
} else {
continue
}
if derr != nil {
return derr
}
}
return nil
}
// stepsListen returns the first step that finishes/errs.
func stepsListen(names []string, chans map[string]chan DError) (string, DError) {
cases := make([]reflect.SelectCase, len(names))
for i, name := range names {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(chans[name])}
}
caseIndex, value, recvOk := reflect.Select(cases)
name := names[caseIndex]
if recvOk {
// recvOk -> a step failed, return the error.
return name, value.Interface().(DError)
}
return name, nil
}
// IterateWorkflowSteps iterates over all workflow steps, including included
// workflow steps, and calls cb callback function
//
// Deprecated: This will be removed in a future release.
func (w *Workflow) IterateWorkflowSteps(cb func(step *Step)) {
for _, step := range w.Steps {
if step.IncludeWorkflow != nil {
//recurse into included workflow
step.IncludeWorkflow.Workflow.IterateWorkflowSteps(cb)
}
cb(step)
}
}
// CancelWithReason cancels workflow with a specific reason.
// The specific reason replaces "is canceled" in the default error message.
// Multiple invocations will not cause an error, but only the first reason
// will be retained.
func (w *Workflow) CancelWithReason(reason string) {
w.cancelMx.Lock()
if w.cancelReason == "" {
w.cancelReason = reason
}
w.cancelMx.Unlock()
w.CancelWorkflow()
}
// CancelWorkflow cancels the workflow. Safe to call multiple times.
// Prefer this to closing the w.Cancel channel,
// which will panic if it has already been closed.
func (w *Workflow) CancelWorkflow() {
w.cancelMx.Lock()
defer w.cancelMx.Unlock()
if !w.isCanceled {
w.isCanceled = true
// Extra guard in case something manually closed the channel.
defer func() { recover() }()
close(w.Cancel)
}
}
func (w *Workflow) getCancelReason() string {
cancelReason := w.cancelReason
for wi := w; cancelReason == "" && wi != nil; wi = wi.parent {
cancelReason = wi.cancelReason
}
return cancelReason
}
func (w *Workflow) onStepCancel(s *Step, stepClass string) DError {
if s == nil {
return nil
}
cancelReason := w.getCancelReason()
if cancelReason == "" {
cancelReason = "is canceled"
}
errorMessageFormat := "Step %q (%s) " + cancelReason + "."
s.w.LogWorkflowInfo(errorMessageFormat, s.name, stepClass)
return Errf(errorMessageFormat, s.name, stepClass)
}