cli_tools/common/image/importer/api_inflater.go (271 lines of code) (raw):

// Copyright 2020 Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package importer import ( "context" "fmt" "strings" "sync" "time" daisy "github.com/GoogleCloudPlatform/compute-daisy" daisyCompute "github.com/GoogleCloudPlatform/compute-daisy/compute" "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" "github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/domain" "github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/utils/daisyutils" "github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/utils/logging" "github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/utils/param" "github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/utils/storage" ) func isCausedByUnsupportedFormat(err error) bool { if err == nil { return false } return strings.Contains(err.Error(), "INVALID_IMAGE_FILE") } func isCausedByAlphaAPIAccess(err error) bool { if err == nil { return false } return strings.Contains(err.Error(), "Required 'Alpha Access' permission") } // apiInflater implements `importer.inflater` using the Compute Engine API type apiInflater struct { *apiInflaterProperties guestOsFeatures []*compute.GuestOsFeature wg sync.WaitGroup cancelChan chan string } // apiInflaterProperties covers properties of a new API inflater. // "isShadowInflater" indicates whether it's used as a shadow inflater. // "needChecksum" indicates whether checksum of the output disk needs to be // calculated. When QEMU checksum is failed to be calculated, we don't need to // calculate the disk checksum anymore. type apiInflaterProperties struct { request ImageImportRequest computeClient daisyCompute.Client storageClient domain.StorageClientInterface logger logging.Logger isShadowInflater bool needChecksum bool } func createAPIInflater(properties *apiInflaterProperties) *apiInflater { inflater := apiInflater{ apiInflaterProperties: properties, cancelChan: make(chan string, 1), } if properties.request.UefiCompatible { inflater.guestOsFeatures = []*compute.GuestOsFeature{{Type: "UEFI_COMPATIBLE"}} } return &inflater } func (inflater *apiInflater) Inflate() (persistentDisk, inflationInfo, error) { if inflater.isShadowInflater { return inflater.inflateForShadowTest() } ctx := context.Background() startTime := time.Now() diskName := getDiskName(inflater.request.ExecutionID) cd, err := inflater.createDisk(diskName) if err != nil { return persistentDisk{}, inflationInfo{}, daisy.Errf("Failed to create disk by api inflater: %v", err) } pd, ii, err := inflater.getDiskAttributes(ctx, diskName, cd, startTime) if err != nil { return pd, ii, err } // Calculate checksum by daisy workflow ii.checksum, err = inflater.calculateChecksum(pd.uri) return pd, ii, err } func (inflater *apiInflater) inflateForShadowTest() (persistentDisk, inflationInfo, error) { inflater.wg.Add(1) defer inflater.wg.Done() ctx := context.Background() startTime := time.Now() diskName := inflater.getShadowDiskName() cd, err := inflater.createDisk(diskName) if err != nil { return persistentDisk{}, inflationInfo{}, daisy.Errf("Failed to create shadow disk: %v", err) } // Cleanup the shadow disk ignoring error defer inflater.computeClient.DeleteDisk(inflater.request.Project, inflater.request.Zone, cd.Name) // If received a cancel signal from cancel(), then return early. Otherwise, it will waste // 2 min+ on calculateChecksum(). select { case <-inflater.cancelChan: return persistentDisk{}, inflationInfo{}, nil default: } pd, ii, err := inflater.getDiskAttributes(ctx, diskName, cd, startTime) if err != nil { return pd, ii, err } // Calculate checksum by daisy workflow ii.checksum, err = inflater.calculateChecksum(pd.uri) return pd, ii, err } func (inflater *apiInflater) createDisk(diskName string) (compute.Disk, error) { diskType := fmt.Sprintf("projects/%s/zones/%s/diskTypes/pd-ssd", inflater.request.Project, inflater.request.Zone) cd := compute.Disk{ Name: diskName, SourceStorageObject: inflater.request.Source.Path(), GuestOsFeatures: inflater.guestOsFeatures, Type: diskType, Licenses: []string{fmt.Sprintf("projects/%s/global/licenses/virtual-disk-import", param.ReleaseProject)}, } err := inflater.computeClient.CreateDisk(inflater.request.Project, inflater.request.Zone, &cd) return cd, err } func (inflater *apiInflater) getDiskAttributes(ctx context.Context, diskName string, cd compute.Disk, startTime time.Time) (persistentDisk, inflationInfo, error) { // Prepare return value bkt, objPath, err := storage.GetGCSObjectPathElements(inflater.request.Source.Path()) if err != nil { return persistentDisk{}, inflationInfo{}, err } sourceFile := inflater.storageClient.GetObject(bkt, objPath).GetObjectHandle() attrs, err := sourceFile.Attrs(ctx) if err != nil { return persistentDisk{}, inflationInfo{}, daisy.Errf("Failed to get source file attributes: %v", err) } sourceFileSizeGb := (attrs.Size-1)/1073741824 + 1 diskURI := fmt.Sprintf("zones/%s/disks/%s", inflater.request.Zone, diskName) pd := persistentDisk{ uri: diskURI, sizeGb: cd.SizeGb, sourceGb: sourceFileSizeGb, sourceType: "vmdk", // only vmdk is supported right now } ii := inflationInfo{ inflationType: "api", inflationTime: time.Since(startTime), } return pd, ii, nil } func (inflater *apiInflater) getShadowDiskName() string { return fmt.Sprintf("shadow-disk-%v", inflater.request.ExecutionID) } func (inflater *apiInflater) Cancel(reason string) bool { if !inflater.isShadowInflater { // We don't have to do any actual cancellation for the single CreateDisk API call. // Only the daisy workflow is worth cancelling. return false } // Send cancel signal inflater.cancelChan <- reason // Wait for inflate() to finish. Otherwise, the whole program might exit // before DeleteDisk() was executed. inflater.wg.Wait() // Expect 404 error to ensure shadow disk has been cleaned up. _, err := inflater.computeClient.GetDisk(inflater.request.Project, inflater.request.Zone, inflater.getShadowDiskName()) if apiErr, ok := err.(*googleapi.Error); !ok || apiErr.Code != 404 { if err == nil { inflater.logger.Debug(fmt.Sprintf("apiInflater.inflate is canceled, cleanup is failed: %v", reason)) } else { inflater.logger.Debug(fmt.Sprintf("apiInflater.inflate is canceled, cleanup failed to verify: %v", reason)) } return false } inflater.logger.Debug(fmt.Sprintf("apiInflater.inflate is canceled: %v", reason)) return true } // run a workflow to calculate checksum func (inflater *apiInflater) calculateChecksum(diskURI string) (string, error) { if !inflater.needChecksum { inflater.logger.Debug("Skipped checksum calculation.") return "", nil } inflater.logger.Debug("Started checksum calculation.") daisyPrefix := "api" if inflater.isShadowInflater { daisyPrefix = "shadow" } env := inflater.request.EnvironmentSettings() if env.DaisyLogLinePrefix != "" { env.DaisyLogLinePrefix += "-" } env.DaisyLogLinePrefix += fmt.Sprintf("%v-disk-checksum", daisyPrefix) worker := daisyutils.NewDaisyWorker(func() (*daisy.Workflow, error) { return inflater.getCalculateChecksumWorkflow(diskURI, daisyPrefix), nil }, env, inflater.logger) checksum, err := worker.RunAndReadSerialValue("disk-checksum", map[string]string{}) if err != nil { err = daisy.Errf("Failed to calculate checksum: %v", err) } return checksum, err } func (inflater *apiInflater) getCalculateChecksumWorkflow(diskURI string, daisyPrefix string) *daisy.Workflow { w := daisy.New() w.Name = daisyPrefix + "-disk-checksum" checksumScript := checksumScriptConst w.Steps = map[string]*daisy.Step{ "create-disks": { CreateDisks: &daisy.CreateDisks{ { Disk: compute.Disk{ Name: "disk-${NAME}", SourceImage: "projects/compute-image-import/global/images/debian-9-worker-v20230926", Type: "pd-ssd", }, }, }, }, "create-instance": { CreateInstances: &daisy.CreateInstances{ Instances: []*daisy.Instance{ { Instance: compute.Instance{ Name: "inst-${NAME}", Disks: []*compute.AttachedDisk{ {Source: "disk-${NAME}"}, {Source: diskURI, Mode: "READ_ONLY"}, }, MachineType: "n1-highcpu-4", Metadata: &compute.Metadata{ Items: []*compute.MetadataItems{ { Key: "startup-script", Value: &checksumScript, }, }, }, NetworkInterfaces: []*compute.NetworkInterface{ { AccessConfigs: []*compute.AccessConfig{}, Network: inflater.request.Network, Subnetwork: inflater.request.Subnet, }, }, ServiceAccounts: []*compute.ServiceAccount{ { Email: "${compute_service_account}", Scopes: []string{"https://www.googleapis.com/auth/devstorage.read_write"}, }, }, }, }, }, }, }, "wait-for-checksum": { WaitForInstancesSignal: &daisy.WaitForInstancesSignal{ { Name: "inst-${NAME}", SerialOutput: &daisy.SerialOutput{ Port: 1, SuccessMatch: "Checksum calculated.", StatusMatch: "Checksum:", }, }, }, }, } w.Dependencies = map[string][]string{ "create-instance": {"create-disks"}, "wait-for-checksum": {"create-instance"}, } // Calculate checksum within 20min. env := inflater.request.EnvironmentSettings() env.Timeout = "20m" env.ApplyToWorkflow(w) computeServiceAccount := "default" if inflater.request.ComputeServiceAccount != "" { computeServiceAccount = inflater.request.ComputeServiceAccount } w.AddVar("compute_service_account", computeServiceAccount) return w } // Dup logic in import_image.sh. If change anything here, please change in both places. const ( checksumScriptConst = ` function serialOutputPrefixedKeyValue() { stdbuf -oL echo "$1: <serial-output key:'$2' value:'$3'>" } CHECK_DEVICE=sdb BLOCK_COUNT=$(cat /sys/class/block/$CHECK_DEVICE/size) # Check size = 200000*512 = 100MB CHECK_COUNT=200000 CHECKSUM1=$(sudo dd if=/dev/$CHECK_DEVICE ibs=512 skip=0 count=$CHECK_COUNT | md5sum) CHECKSUM2=$(sudo dd if=/dev/$CHECK_DEVICE ibs=512 skip=$(( 2000000 - $CHECK_COUNT )) count=$CHECK_COUNT | md5sum) CHECKSUM3=$(sudo dd if=/dev/$CHECK_DEVICE ibs=512 skip=$(( 20000000 - $CHECK_COUNT )) count=$CHECK_COUNT | md5sum) CHECKSUM4=$(sudo dd if=/dev/$CHECK_DEVICE ibs=512 skip=$(( $BLOCK_COUNT - $CHECK_COUNT )) count=$CHECK_COUNT | md5sum) serialOutputPrefixedKeyValue "Checksum" "disk-checksum" "$CHECKSUM1-$CHECKSUM2-$CHECKSUM3-$CHECKSUM4" echo "Checksum calculated."` )