step_wait_for_instances_signal.go (367 lines of code) (raw):

// Copyright 2017 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package daisy import ( "context" "encoding/json" "fmt" "regexp" "strings" "sync" "time" "google.golang.org/api/googleapi" ) const ( defaultInterval = "10s" ) var ( serialOutputValueRegex = regexp.MustCompile(".*<serial-output key:'(.*)' value:'(.*)'>") ) // WaitForInstancesSignal is a Daisy WaitForInstancesSignal workflow step. type WaitForInstancesSignal []*InstanceSignal // WaitForAnyInstancesSignal is a Daisy WaitForAnyInstancesSignal workflow step. type WaitForAnyInstancesSignal []*InstanceSignal // FailureMatches is a list of matching failure strings. type FailureMatches []string // UnmarshalJSON unmarshals FailureMatches. func (fms *FailureMatches) UnmarshalJSON(b []byte) error { var s string if err := json.Unmarshal(b, &s); err == nil { *fms = []string{s} return nil } //not a string, try unmarshalling into an array. Need a temp type to avoid infinite loop. var ss []string if err := json.Unmarshal(b, &ss); err != nil { return err } *fms = FailureMatches(ss) return nil } // SerialOutput describes text signal strings that will be written to the serial // port. // A StatusMatch will print out the matching line from the StatusMatch onward. // This step will not complete until a line in the serial output matches // SuccessMatch or FailureMatch. A match with FailureMatch will cause the step to fail. type SerialOutput struct { Port int64 `json:",omitempty"` SuccessMatch string `json:",omitempty"` FailureMatch FailureMatches `json:"failureMatch,omitempty"` StatusMatch string `json:",omitempty"` } // GuestAttribute describes text signal strings that will be written to guest // attributes. // This step will not complete until the key exists and matches the value in // SuccessValue (if specified and non empty). If SuccessValue is set, any other // value in the key will cause the step to fail. type GuestAttribute struct { Namespace string `json:",omitempty"` KeyName string `json:",omitempty"` SuccessValue string `json:",omitempty"` } // InstanceSignal waits for a signal from an instance. type InstanceSignal struct { // Instance name to wait for. Name string // Interval to check for signal (default is 5s). // Must be parsable by https://golang.org/pkg/time/#ParseDuration. Interval string `json:",omitempty"` interval time.Duration // DEPRECATED use Status signal instead // Wait for the instance to stop. Stopped bool `json:",omitempty"` // Wait for a string match in the serial output. SerialOutput *SerialOutput `json:",omitempty"` // Wait for a key or value match in guest attributes. GuestAttribute *GuestAttribute `json:",omitempty"` // Wait for the instance to have one of the given statuses // Cannot be set at the same time as Stopped Status []string `json:",omitempty"` } func waitForInstanceStopped(s *Step, project, zone, name string, interval time.Duration) DError { w := s.w w.LogStepInfo(s.name, "WaitForInstancesSignal", "Waiting for instance %q to stop.", name) tick := time.Tick(interval) for { select { case <-s.w.Cancel: return nil case <-tick: stopped, err := s.w.ComputeClient.InstanceStopped(project, zone, name) if err != nil { return typedErr(apiError, "failed to check whether instance is stopped", err) } if stopped { w.LogStepInfo(s.name, "WaitForInstancesSignal", "Instance %q stopped.", name) return nil } } } } func waitForInstanceStatus(s *Step, project, zone, name string, interval time.Duration, target []string) DError { w := s.w w.LogStepInfo(s.name, "WaitForInstancesSignal", "Waiting for instance %q to have status one of %v.", name, target) tick := time.Tick(interval) for { select { case <-s.w.Cancel: return nil case <-tick: status, err := s.w.ComputeClient.InstanceStatus(project, zone, name) if err != nil { return typedErr(apiError, fmt.Sprintf("failed to check instance %s status", name), err) } for _, tstatus := range target { if status == tstatus { w.LogStepInfo(s.name, "WaitForInstancesSignal", "Instance %q is %s, done waiting for status.", name, tstatus) return nil } } } } } func waitForSerialOutput(s *Step, project, zone, name string, so *SerialOutput, interval time.Duration) DError { w := s.w msg := fmt.Sprintf("Instance %q: watching serial port %d", name, so.Port) if so.SuccessMatch != "" { msg += fmt.Sprintf(", SuccessMatch: %q", so.SuccessMatch) } if len(so.FailureMatch) > 0 { msg += fmt.Sprintf(", FailureMatch: %q (this is not an error)", so.FailureMatch) } if so.StatusMatch != "" { msg += fmt.Sprintf(", StatusMatch: %q", so.StatusMatch) } w.LogStepInfo(s.name, "WaitForInstancesSignal", msg+".") var start int64 var errs int tailString := "" tick := time.Tick(interval) for { select { case <-s.w.Cancel: return nil case <-tick: resp, err := w.ComputeClient.GetSerialPortOutput(project, zone, name, so.Port, start) if err != nil { status, sErr := w.ComputeClient.InstanceStatus(project, zone, name) if sErr != nil { err = fmt.Errorf("%v, error getting InstanceStatus: %v", err, sErr) } else { err = fmt.Errorf("%v, InstanceStatus: %q", err, status) } // Wait until machine restarts to evaluate SerialOutput. if status == "TERMINATED" || status == "STOPPED" || status == "STOPPING" { continue } // Retry up to 3 times in a row on any error if we successfully got InstanceStatus. if errs < 3 { errs++ continue } return Errf("WaitForInstancesSignal: instance %q: error getting serial port: %v", name, err) } start = resp.Next lines := strings.Split(resp.Contents, "\n") for i, ln := range lines { // If there is a unconsumed tail string from the previous block of content, concat it with the 1st line of the new block of content. if i == 0 && tailString != "" { ln = tailString + ln tailString = "" } // If the content is not ended with a "\n", we want to store the last line as tail string, so it can be concat with the next block of content. if i == len(lines)-1 && lines[len(lines)-1] != "" { tailString = ln break } if so.StatusMatch != "" { if i := strings.Index(ln, so.StatusMatch); i != -1 { w.LogStepInfo(s.name, "WaitForInstancesSignal", "Instance %q: StatusMatch found: %q", name, strings.TrimSpace(ln[i:])) extractOutputValue(w, ln) } } if len(so.FailureMatch) > 0 { for _, failureMatch := range so.FailureMatch { if i := strings.Index(ln, failureMatch); i != -1 { errMsg := strings.TrimSpace(ln[i:]) format := "WaitForInstancesSignal FailureMatch found for %q: %q" return newErr(errMsg, fmt.Errorf(format, name, errMsg)) } } } if so.SuccessMatch != "" { if i := strings.Index(ln, so.SuccessMatch); i != -1 { w.LogStepInfo(s.name, "WaitForInstancesSignal", "Instance %q: SuccessMatch found %q", name, strings.TrimSpace(ln[i:])) return nil } } } errs = 0 } } } func waitForGuestAttribute(s *Step, project, zone, name string, ga *GuestAttribute, interval time.Duration) DError { var keyTokens []string if ga.Namespace != "" { keyTokens = append(keyTokens, ga.Namespace) } keyTokens = append(keyTokens, ga.KeyName) varkey := strings.Join(keyTokens, "/") w := s.w msg := fmt.Sprintf("Instance %q: watching for key %s", name, varkey) if ga.SuccessValue != "" { msg += fmt.Sprintf(", SuccessValue: %q", ga.SuccessValue) } w.LogStepInfo(s.name, "WaitForInstancesSignal", msg+".") // The limit for querying guest attributes is documented as 10 queries/minute. minInterval, err := time.ParseDuration("6s") if err == nil && interval < minInterval { interval = minInterval } tick := time.Tick(interval) var errs int for { select { case <-s.w.Cancel: return nil case <-tick: resp, err := w.ComputeClient.GetGuestAttributes(project, zone, name, "", varkey) if err != nil { if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == 404 { // 404 is OK, that means the key isn't present yet. Retry until timeout. continue } status, sErr := w.ComputeClient.InstanceStatus(project, zone, name) if sErr != nil { err = fmt.Errorf("%v, error getting InstanceStatus: %v", err, sErr) errs++ } else { errs = 0 } // Wait until machine restarts to get Guest Attributes if status == "TERMINATED" || status == "STOPPED" || status == "STOPPING" { continue } // Permit up to 3 consecutive non-404 errors getting guest attrs so long as we can get instance // status. if errs < 3 { continue } return Errf("WaitForInstancesSignal: instance %q: error getting guest attribute: %v", name, err) } if ga.SuccessValue != "" { if resp.VariableValue != ga.SuccessValue { errMsg := strings.TrimSpace(resp.VariableValue) format := "WaitForInstancesSignal bad guest attribute value found for %q: %q" return Errf(format, name, errMsg) } w.LogStepInfo(s.name, "WaitForInstancesSignal", "Instance %q: SuccessValue found for key %q", name, ga.KeyName) return nil } w.LogStepInfo(s.name, "WaitForInstancesSignal", "Instance %q found key %q", name, ga.KeyName) return nil } } } func extractOutputValue(w *Workflow, s string) { if matches := serialOutputValueRegex.FindStringSubmatch(s); matches != nil && len(matches) == 3 { for w.parent != nil { w = w.parent } w.AddSerialConsoleOutputValue(matches[1], matches[2]) } } func (w *WaitForInstancesSignal) populate(ctx context.Context, s *Step) DError { is := (*[]*InstanceSignal)(w) return populateForWaitForInstancesSignal(is, "wait_for_instance_signal") } func (w *WaitForAnyInstancesSignal) populate(ctx context.Context, s *Step) DError { is := (*[]*InstanceSignal)(w) return populateForWaitForInstancesSignal(is, "wait_for_any_instance_signal") } func populateForWaitForInstancesSignal(w *[]*InstanceSignal, sn string) DError { for _, ws := range *w { if ws.Interval == "" { ws.Interval = defaultInterval } var err error ws.interval, err = time.ParseDuration(ws.Interval) if err != nil { return newErr(fmt.Sprintf("failed to parse duration for step %v", sn), err) } } return nil } func (w *WaitForInstancesSignal) run(ctx context.Context, s *Step) DError { is := (*[]*InstanceSignal)(w) return runForWaitForInstancesSignal(is, s, true) } func (w *WaitForAnyInstancesSignal) run(ctx context.Context, s *Step) DError { is := (*[]*InstanceSignal)(w) return runForWaitForInstancesSignal(is, s, false) } func runForWaitForInstancesSignal(w *[]*InstanceSignal, s *Step, waitAll bool) DError { var wg sync.WaitGroup e := make(chan DError) for _, is := range *w { wg.Add(1) go func(is *InstanceSignal) { defer wg.Done() i, ok := s.w.instances.get(is.Name) if !ok { e <- Errf("unresolved instance %q", is.Name) return } m := NamedSubexp(instanceURLRgx, i.link) serialSig := make(chan struct{}) guestSig := make(chan struct{}) statusSig := make(chan struct{}) if is.Stopped { go func() { if err := waitForInstanceStopped(s, m["project"], m["zone"], m["instance"], is.interval); err != nil { e <- err } close(statusSig) }() } else if len(is.Status) > 0 { go func() { if err := waitForInstanceStatus(s, m["project"], m["zone"], m["instance"], is.interval, is.Status); err != nil { e <- err } close(statusSig) }() } if is.SerialOutput != nil { go func() { if err := waitForSerialOutput(s, m["project"], m["zone"], m["instance"], is.SerialOutput, is.interval); err != nil || !waitAll { // send a signal to end other waiting instances e <- err } close(serialSig) }() } if is.GuestAttribute != nil { go func() { if err := waitForGuestAttribute(s, m["project"], m["zone"], m["instance"], is.GuestAttribute, is.interval); err != nil || !waitAll { // send a signal to end other waiting instances e <- err } close(guestSig) }() } select { case <-guestSig: return case <-serialSig: return case <-statusSig: return } }(is) } go func() { wg.Wait() e <- nil }() select { case err := <-e: return err case <-s.w.Cancel: return nil } } func (w *WaitForInstancesSignal) validate(ctx context.Context, s *Step) DError { is := (*[]*InstanceSignal)(w) return validateForWaitForInstancesSignal(is, s) } func (w *WaitForAnyInstancesSignal) validate(ctx context.Context, s *Step) DError { is := (*[]*InstanceSignal)(w) return validateForWaitForInstancesSignal(is, s) } func validateForWaitForInstancesSignal(w *[]*InstanceSignal, s *Step) DError { // Instance checking. for _, i := range *w { if _, err := s.w.instances.regUse(i.Name, s); err != nil { return err } if i.interval == 0*time.Second { return Errf("%q: cannot wait for instance signal, no interval given", i.Name) } if i.Stopped && len(i.Status) > 0 { return Errf("%q: Stopped and Status cannot be set simultaneously", i.Name) } if i.SerialOutput == nil && i.GuestAttribute == nil && i.Stopped == false && len(i.Status) < 1 { return Errf("%q: cannot wait for instance signal, nothing to wait for", i.Name) } if i.SerialOutput != nil { if i.SerialOutput.Port == 0 { return Errf("%q: cannot wait for instance signal via SerialOutput, no Port given", i.Name) } if i.SerialOutput.SuccessMatch == "" && len(i.SerialOutput.FailureMatch) == 0 { return Errf("%q: cannot wait for instance signal via SerialOutput, no SuccessMatch or FailureMatch given", i.Name) } } } return nil }