cli_tools/common/utils/daisyutils/daisy_worker.go (165 lines of code) (raw):

// Copyright 2020 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package daisyutils import ( "errors" "fmt" "sync" daisy "github.com/GoogleCloudPlatform/compute-daisy" "github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/assert" "github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/utils/logging" ) // To rebuild the mock for DaisyWorker, run `go generate ./...` //go:generate go run github.com/golang/mock/mockgen -package mocks -source $GOFILE -destination ../../../mocks/mock_daisy_worker.go // DaisyWorker is a facade over daisy.Workflow to facilitate mocking. type DaisyWorker interface { Run(vars map[string]string) error RunAndReadSerialValue(key string, vars map[string]string) (string, error) RunAndReadSerialValues(vars map[string]string, keys ...string) (map[string]string, error) Cancel(reason string) bool } // WorkflowProvider returns a new instance of a Daisy workflow. type WorkflowProvider func() (*daisy.Workflow, error) // NewDaisyWorker returns an implementation of DaisyWorker. The returned value is // designed to be run once and discarded. In other words, don't run the same instance twice. // // hooks contains additional WorkflowPreHook or WorkflowPostHook instances. If hooks doesn't // include a resource labeler, one will be created. func NewDaisyWorker(wf WorkflowProvider, env EnvironmentSettings, logger logging.Logger, hooks ...interface{}) DaisyWorker { hooks = append(createResourceLabelerIfMissing(env, hooks), &ApplyEnvToWorkflow{env}, &ConfigureDaisyLogging{env}, &FallbackToPDStandard{logger: logger}, ) if env.NoExternalIP { hooks = append(hooks, &RemoveExternalIPHook{}) } if env.NestedVirtualizationEnabled { hooks = append(hooks, &EnableNestedVirtualizationHook{}) } if len(env.WorkerMachineSeries) >= 1 { updateMachineHook := &UpdateMachineTypesHook{logger: logger} updateMachineHook.primaryMachineSeries = env.WorkerMachineSeries[0] if len(env.WorkerMachineSeries) >= 2 { updateMachineHook.secondaryMachineSeries = env.WorkerMachineSeries[1] } hooks = append(hooks, updateMachineHook) } else { logger.Debug("UpdateMachineTypesHook is not activated because machine series are not specified.") } for _, hook := range hooks { switch hook.(type) { case WorkflowPreHook: continue case WorkflowPostHook: continue default: panic(fmt.Sprintf("%T must implement WorkflowPreHook and/or WorkflowPostHook", hook)) } } return &defaultDaisyWorker{workflowProvider: wf, cancel: make(chan string, 1), env: env, logger: logger, hooks: hooks} } // createResourceLabelerIfMissing checks whether there is a resource labeler in hook. // If not, then it creates a new one. func createResourceLabelerIfMissing(env EnvironmentSettings, hooks []interface{}) []interface{} { for _, hook := range hooks { switch hook.(type) { case *ResourceLabeler: return hooks } } assert.NotEmpty(env.Tool.ResourceLabelName) assert.NotEmpty(env.ExecutionID) return append(hooks, NewResourceLabeler( env.Tool.ResourceLabelName, env.ExecutionID, env.Labels, env.StorageLocation)) } type defaultDaisyWorker struct { workflowProvider WorkflowProvider finishedWf *daisy.Workflow logger logging.Logger env EnvironmentSettings hooks []interface{} cancel chan string cancelGuard sync.Once } // Run runs the daisy workflow with the supplied vars. func (w *defaultDaisyWorker) Run(vars map[string]string) (err error) { var wf *daisy.Workflow for attempts := 0; attempts <= 1; attempts++ { if wf, err = w.workflowProvider(); err != nil { break } if err = w.checkIfCancelled(wf); err != nil { break } var retryRequested bool retryRequested, err = w.runOnce(wf, vars) if err == nil || !retryRequested { break } w.logger.Debug(fmt.Sprintf("retryRequested=%v. err=%v", retryRequested, err)) } w.finishedWf = wf return err } // checkIfCancelled determines whether the workflow has been cancelled internally, // or whether a client of DaisyWorker has called cancel. If so, then a non-nil // error is returned describing the cancellation. func (w *defaultDaisyWorker) checkIfCancelled(wf *daisy.Workflow) (err error) { canceled := false reason := "" select { case reason = <-w.cancel: canceled = true case <-wf.Cancel: canceled = true default: break } if canceled { msg := "workflow canceled" if reason != "" { msg = fmt.Sprintf("%s: %s", msg, reason) } err = errors.New(msg) } return err } // runOnce applies vars to the workflow, runs hooks, and runs the workflow. The retry // return value indicates whether a hook has requested a retry. func (w *defaultDaisyWorker) runOnce(wf *daisy.Workflow, vars map[string]string) (retry bool, err error) { if err := (&ApplyAndValidateVars{w.env, vars}).PreRunHook(wf); err != nil { return false, err } for _, hook := range w.hooks { preHook, isPreHook := hook.(WorkflowPreHook) if isPreHook { if err := preHook.PreRunHook(wf); err != nil { return false, err } } } err = RunWorkflowWithCancelSignal(wf, w.cancel) if wf.Logger != nil { for _, trace := range wf.Logger.ReadSerialPortLogs() { w.logger.Trace(trace) } } if err != nil { PostProcessDErrorForNetworkFlag(w.env.Tool.HumanReadableName, err, w.env.Network, wf) for _, hook := range w.hooks { postHook, isPostHook := hook.(WorkflowPostHook) if isPostHook { wantRetry := false wantRetry, err = postHook.PostRunHook(err) retry = retry || wantRetry } } } return retry, err } // RunAndReadSerialValue runs the daisy workflow with the supplied vars, and returns the serial // output value associated with the supplied key. func (w *defaultDaisyWorker) RunAndReadSerialValue(key string, vars map[string]string) (string, error) { m, err := w.RunAndReadSerialValues(vars, key) return m[key], err } // RunAndReadSerialValues runs the daisy workflow with the supplied vars, and returns the serial // output values associated with the supplied keys. func (w *defaultDaisyWorker) RunAndReadSerialValues(vars map[string]string, keys ...string) (map[string]string, error) { err := w.Run(vars) m := map[string]string{} if w.finishedWf != nil { for _, key := range keys { m[key] = w.finishedWf.GetSerialConsoleOutputValue(key) } } return m, err } func (w *defaultDaisyWorker) Cancel(reason string) bool { // once.Do is required to ensure that additional calls // to Cancel won't write to a closed channel. w.cancelGuard.Do( func() { w.cancel <- reason close(w.cancel) }, ) return true }