step_create_instances.go (193 lines of code) (raw):
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package daisy
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"path"
"strings"
"sync"
"time"
"google.golang.org/api/googleapi"
)
// CreateInstances is a Daisy CreateInstances workflow step.
type CreateInstances struct {
Instances []*Instance
InstancesBeta []*InstanceBeta
}
// UnmarshalJSON unmarshals Instance.
func (ci *CreateInstances) UnmarshalJSON(b []byte) error {
var instancesBeta []*InstanceBeta
if err := json.Unmarshal(b, &instancesBeta); err != nil {
return err
}
ci.InstancesBeta = instancesBeta
var instances []*Instance
if err := json.Unmarshal(b, &instances); err != nil {
return err
}
ci.Instances = instances
return nil
}
func logSerialOutput(ctx context.Context, s *Step, ii InstanceInterface, ib *InstanceBase, port int64, interval time.Duration) {
w := s.w
w.stepWait.Add(1)
defer w.stepWait.Done()
logsObj := path.Join(w.logsPath, fmt.Sprintf("%s-serial-port%d.log", ii.getName(), port))
w.LogStepInfo(s.name, "CreateInstances", "Streaming instance %q serial port %d output to https://storage.cloud.google.com/%s/%s", ii.getName(), port, w.bucket, logsObj)
var start int64
var buf bytes.Buffer
var gcsErr bool
var readFromSerial bool
var numErr int
tick := time.Tick(interval)
Loop:
for {
select {
case <-tick:
resp, err := w.ComputeClient.GetSerialPortOutput(path.Base(ib.Project), path.Base(ii.getZone()), ii.getName(), port, start)
if err != nil {
numErr++
status, sErr := w.ComputeClient.InstanceStatus(path.Base(ib.Project), path.Base(ii.getZone()), ii.getName())
switch status {
case "TERMINATED", "STOPPED", "STOPPING":
// Instance is stopped or stopping.
if sErr == nil {
break Loop
}
}
if numErr > 10 {
// Only emit an error log if we were able to read *some* data from the
// instance, since there's a race condition where an instance can shut
// down fast enough that the call to InstanceStatus will return a 404.
if !readFromSerial {
w.LogStepInfo(s.name, "CreateInstances",
"Instance %q: error getting serial port: %v", ii.getName(), err)
}
break Loop
}
continue
}
readFromSerial = true
numErr = 0
start = resp.Next
buf.WriteString(resp.Contents)
w.Logger.AppendSerialPortLogs(w, ii.getName(), resp.Contents)
wc := w.StorageClient.Bucket(w.bucket).Object(logsObj).NewWriter(ctx)
wc.ContentType = "text/plain"
if _, err := wc.Write(buf.Bytes()); err != nil && !gcsErr {
gcsErr = true
w.LogStepInfo(s.name, "CreateInstances", "Instance %q: error writing log to GCS: %v", ii.getName(), err)
continue
} else if err != nil { // dont try to close the writer
continue
}
if err := wc.Close(); err != nil && !gcsErr {
gcsErr = true
w.LogStepInfo(s.name, "CreateInstances", "Instance %q: error saving log to GCS: %v", ii.getName(), err)
continue
}
if w.isCanceled {
break Loop
}
}
}
w.Logger.WriteSerialPortLogsToCloudLogging(w, ii.getName())
}
// populate preprocesses fields: Name, Project, Zone, Description, MachineType, NetworkInterfaces, Scopes, ServiceAccounts, and daisyName.
// - sets defaults
// - extends short partial URLs to include "projects/<project>"
func (ci *CreateInstances) populate(ctx context.Context, s *Step) DError {
var errs DError
if ci.Instances != nil {
for _, i := range ci.Instances {
errs = addErrs(errs, (&i.InstanceBase).populate(ctx, i, s))
}
}
if ci.InstancesBeta != nil {
for _, i := range ci.InstancesBeta {
errs = addErrs(errs, (&i.InstanceBase).populate(ctx, i, s))
}
}
return errs
}
func (ci *CreateInstances) validate(ctx context.Context, s *Step) DError {
var errs DError
if ci.instanceUsesBetaFeatures() {
for _, i := range ci.InstancesBeta {
errs = addErrs(errs, (&i.InstanceBase).validate(ctx, i, s))
}
} else {
for _, i := range ci.Instances {
errs = addErrs(errs, (&i.InstanceBase).validate(ctx, i, s))
}
}
return errs
}
func (ci *CreateInstances) run(ctx context.Context, s *Step) DError {
var wg sync.WaitGroup
w := s.w
eChan := make(chan DError)
createInstance := func(ii InstanceInterface, ib *InstanceBase) {
// Just try to delete it, a 404 here indicates the instance doesn't exist.
if ib.OverWrite {
if err := ii.delete(w.ComputeClient, true); err != nil {
if apiErr, ok := err.(*googleapi.Error); !ok || apiErr.Code != 404 {
eChan <- Errf("error deleting existing instance: %v", err)
return
}
}
}
// Get the source machine image link if using a source machine image.
if ii.getSourceMachineImage() != "" {
if image, ok := w.machineImages.get(ii.getSourceMachineImage()); ok {
ii.setSourceMachineImage(image.link)
}
}
defer wg.Done()
ii.updateDisksAndNetworksBeforeCreate(w)
w.LogStepInfo(s.name, "CreateInstances", "Creating instance %q.", ii.getName())
if err := ii.create(w.ComputeClient); err != nil {
// Fallback to no-external-ip mode to workaround organization policy.
if ib.RetryWhenExternalIPDenied && isExternalIPDeniedByOrganizationPolicy(err) {
w.LogStepInfo(s.name, "CreateInstances", "Falling back to no-external-ip mode "+
"for creating instance %v due to the fact that external IP is denied by organization policy.", ii.getName())
UpdateInstanceNoExternalIP(s)
err = ii.create(w.ComputeClient)
}
if err != nil {
eChan <- newErr("failed to create instances", err)
return
}
}
ib.createdInWorkflow = true
for _, port := range ib.SerialPortsToLog {
go logSerialOutput(ctx, s, ii, ib, port, 3*time.Second)
}
}
if ci.instanceUsesBetaFeatures() {
for _, i := range ci.InstancesBeta {
wg.Add(1)
go createInstance(i, &i.InstanceBase)
}
} else {
for _, i := range ci.Instances {
wg.Add(1)
go createInstance(i, &i.InstanceBase)
}
}
go func() {
wg.Wait()
eChan <- nil
}()
select {
case err := <-eChan:
return err
case <-w.Cancel:
// Wait so instances being created now can be deleted.
wg.Wait()
return nil
}
}
func (ci *CreateInstances) instanceUsesBetaFeatures() bool {
for _, instanceBeta := range ci.InstancesBeta {
if instanceBeta != nil && instanceBeta.SourceMachineImage != "" {
return true
}
}
// if GA instances collection is empty, switch to Beta
return len(ci.Instances) == 0
}
func isExternalIPDeniedByOrganizationPolicy(err error) bool {
if gErr, ok := err.(*googleapi.Error); ok && gErr.Code == http.StatusPreconditionFailed {
return strings.Contains(gErr.Message, "constraints/compute.vmExternalIpAccess")
}
return false
}