testworkflow.go (956 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package imagetest is a library for the orchestration components of CIT. It // executes testworkflows (generated by test suites), collates the results into // junit xml. package imagetest import ( "context" "fmt" "io/ioutil" "log" "math/rand" "strconv" "strings" "sync" "time" "cloud.google.com/go/storage" "github.com/GoogleCloudPlatform/cloud-image-tests/cleanerupper" "github.com/GoogleCloudPlatform/cloud-image-tests/utils" daisy "github.com/GoogleCloudPlatform/compute-daisy" daisycompute "github.com/GoogleCloudPlatform/compute-daisy/compute" "github.com/jstemmer/go-junit-report/v2/junit" computeBeta "google.golang.org/api/compute/v0.beta" "google.golang.org/api/compute/v1" "google.golang.org/api/iterator" ) var ( client *storage.Client ) const ( // PdStandard disktype string PdStandard = "pd-standard" // PdSsd disktype string PdSsd = "pd-ssd" // PdBalanced disktype string PdBalanced = "pd-balanced" // PdExtreme disktype string PdExtreme = "pd-extreme" // HyperdiskExtreme disktype string HyperdiskExtreme = "hyperdisk-extreme" // HyperdiskThroughput disktype string HyperdiskThroughput = "hyperdisk-throughput" // HyperdiskBalanced disktype string HyperdiskBalanced = "hyperdisk-balanced" // LocalSsd disktype string LocalSsd = "local-ssd" testWrapperPath = "/wrapper" testWrapperPathWindows = "/wrapp" ) // TestWorkflowOpts is an options struct for the NewTestWorkflow function. type TestWorkflowOpts struct { // Client is the client used to call the compute service Client daisycompute.Client // ComputeEndpointOverride is an alternate compute endpoint to send requests to compute service to. ComputeEndpointOverride string // Name is the name of the TestWorkflow Name string // Image is an image partial URL, or an image family partial URL Image string // The daisy step timeout Timeout string // Project is the project used to look up images and set up the test workflow. Project string // Zone is the zone used to run the test workflow. Zone string // ExcludeFilter is a filter used to exclude individual test cases within a test suite. Can break test suites if used incorrectly, be careful. ExcludeFilter string // X86Shape is the default shape for x86 images being tested. X86Shape string // ARM64Shape is the default shape for ARM images being tested. ARM64Shape string // UseReservations is whether to consume reservations to create VMs. If true but ReservationURLs is empty, ANY_RESERVATION will be used. UseReservations bool // ReservationURLs is a list of specific reservation URLs to consume. ReservationURLs []string // AcceleratorType is the accelerator type to be used for accelerator tests, which use GPUs. AcceleratorType string } // TestWorkflow defines a test workflow which creates at least one test VM. type TestWorkflow struct { Name string // Client is a shared client for the compute service. Client daisycompute.Client // Image is the image under test Image *compute.Image // ImageBeta is the image under test using Beta API ImageBeta *computeBeta.Image // ImageURL will be the partial URL of a GCE image. ImageURL string // MachineType is the machine type to be used for the test. This can be overridden by individual test suites. MachineType *compute.MachineType Project *compute.Project Zone *compute.Zone // GCSPath is the destination for workflow outputs in gs://[...] form. GCSPath string skipped bool skippedMessage string testExcludeFilter string wf *daisy.Workflow // Global counter for all daisy steps on all VMs. This is an interim solution in order to prevent step-name collisions. counter int // Does this test require exclusive project lockProject bool // ReservationAffinity is the reservation affinity used for VM creation. ReservationAffinity *compute.ReservationAffinity // ReservationAffinityBeta is the reservation affinity used for VM creation with the beta API. ReservationAffinityBeta *computeBeta.ReservationAffinity // AcceleratorType is the accelerator type to be used for accelerator tests which use GPUs. AcceleratorType string } func (t *TestWorkflow) setInstanceTestMetadata(instance *daisy.Instance, suffix string) { name := instance.Name instance.Metadata["_test_vmname"] = name instance.Metadata["_test_package_url"] = "${SOURCESPATH}/testpackage" instance.Metadata["_test_properties_url"] = fmt.Sprintf("${OUTSPATH}/properties/%s.txt", instance.Name) instance.Metadata["_test_package_name"] = fmt.Sprintf("image_test%s", suffix) instance.Metadata["_test_results_url"] = fmt.Sprintf("${OUTSPATH}/%s.txt", name) instance.Metadata["_test_suite_name"] = getTestSuiteName(t) instance.Metadata["_compute_endpoint"] = t.wf.ComputeEndpoint instance.Metadata["_cit_timeout"] = t.wf.DefaultTimeout instance.Metadata["_exclude_discrete_tests"] = t.testExcludeFilter } // addNewVMStep adds an entirely new addVM step, separate from the step used and // modified by the `appendCreateVMStep` function. func (t *TestWorkflow) addNewVMStep(disks []*compute.Disk, instanceParams *daisy.Instance) (*daisy.Step, *daisy.Instance, error) { stepSuffix := fmt.Sprintf("%d", t.counter) if len(disks) == 0 || disks[0].Name == "" { return nil, nil, fmt.Errorf("failed to create VM from empty boot disk") } // The boot disk is the first disk, and the VM name comes from that name := disks[0].Name if strings.Contains(name, "-") { return nil, nil, fmt.Errorf("dashes are disallowed in testworkflow vm names: %s", name) } isWindows := utils.HasFeature(t.Image, "WINDOWS") var suffix string if isWindows { suffix = ".exe" } instance := instanceParams if instance == nil { instance = &daisy.Instance{} } instance.StartupScript = fmt.Sprintf("wrapper%s", suffix) instance.Name = name instance.Scopes = append(instance.Scopes, "https://www.googleapis.com/auth/devstorage.read_write") for _, disk := range disks { currentDisk := &compute.AttachedDisk{Source: disk.Name, AutoDelete: true} instance.Disks = append(instance.Disks, currentDisk) } if instance.Metadata == nil { instance.Metadata = make(map[string]string) } t.setInstanceTestMetadata(instance, suffix) t.skipWindowsStagingKMS(isWindows, instance) createInstances := &daisy.CreateInstances{} createInstances.Instances = append(createInstances.Instances, instance) createVMsStepName := fmt.Sprintf("create-vms-%s", stepSuffix) createVMsStep, err := t.wf.NewStep(createVMsStepName) if err != nil { return nil, nil, err } createVMsStep.CreateInstances = createInstances return createVMsStep, instance, nil } func (t *TestWorkflow) appendCreateVMStep(disks []*compute.Disk, instanceParams *daisy.Instance) (*daisy.Step, *daisy.Instance, error) { if len(disks) == 0 || disks[0].Name == "" { return nil, nil, fmt.Errorf("failed to create VM from empty boot disk") } // The boot disk is the first disk, and the VM name comes from that name := disks[0].Name if strings.Contains(name, "-") { return nil, nil, fmt.Errorf("dashes are disallowed in testworkflow vm names: %s", name) } isWindows := utils.HasFeature(t.Image, "WINDOWS") var suffix string if isWindows { suffix = ".exe" } instance := instanceParams if instance == nil { instance = &daisy.Instance{} } instance.StartupScript = fmt.Sprintf("wrapper%s", suffix) instance.Name = name instance.Scopes = append(instance.Scopes, "https://www.googleapis.com/auth/devstorage.read_write") instance.ReservationAffinity = t.ReservationAffinity if t.ReservationAffinity != nil && t.ReservationAffinity.ConsumeReservationType == "SPECIFIC_RESERVATION" { if instance.Scheduling == nil { instance.Scheduling = &compute.Scheduling{} } instance.Scheduling.ProvisioningModel = "RESERVATION_BOUND" } for _, disk := range disks { currentDisk := &compute.AttachedDisk{Source: disk.Name, AutoDelete: true} currentDisk.AutoDelete = true instance.Disks = append(instance.Disks, currentDisk) } if instance.Metadata == nil { instance.Metadata = make(map[string]string) } t.setInstanceTestMetadata(instance, suffix) t.skipWindowsStagingKMS(isWindows, instance) createInstances := &daisy.CreateInstances{} createInstances.Instances = append(createInstances.Instances, instance) createVMStep, ok := t.wf.Steps[createVMsStepName] if ok { // append to existing step. createVMStep.CreateInstances.Instances = append(createVMStep.CreateInstances.Instances, instance) } else { var err error createVMStep, err = t.wf.NewStep(createVMsStepName) if err != nil { return nil, nil, err } createVMStep.CreateInstances = createInstances } return createVMStep, instance, nil } func (t *TestWorkflow) appendCreateVMStepBeta(disks []*compute.Disk, instance *daisy.InstanceBeta) (*daisy.Step, *daisy.InstanceBeta, error) { if len(disks) == 0 || disks[0].Name == "" { return nil, nil, fmt.Errorf("failed to create VM from empty boot disk") } // The boot disk is the first disk, and the VM name comes from that name := disks[0].Name var suffix string if utils.HasFeature(t.Image, "WINDOWS") { suffix = ".exe" } if instance == nil { instance = &daisy.InstanceBeta{} } instance.StartupScript = fmt.Sprintf("wrapper%s", suffix) instance.Name = name instance.Scopes = append(instance.Scopes, "https://www.googleapis.com/auth/devstorage.read_write") instance.ReservationAffinity = t.ReservationAffinityBeta if t.ReservationAffinityBeta != nil && t.ReservationAffinityBeta.ConsumeReservationType == "SPECIFIC_RESERVATION" { if instance.Scheduling == nil { instance.Scheduling = &computeBeta.Scheduling{} } instance.Scheduling.ProvisioningModel = "RESERVATION_BOUND" } for _, disk := range disks { instance.Disks = append(instance.Disks, &computeBeta.AttachedDisk{Source: disk.Name, AutoDelete: true}) } if instance.Metadata == nil { instance.Metadata = make(map[string]string) } instance.Metadata["_test_vmname"] = name instance.Metadata["_test_package_url"] = "${SOURCESPATH}/testpackage" instance.Metadata["_test_results_url"] = fmt.Sprintf("${OUTSPATH}/%s.txt", name) instance.Metadata["_test_properties_url"] = fmt.Sprintf("${OUTSPATH}/properties/%s.txt", name) instance.Metadata["_test_suite_name"] = getTestSuiteName(t) instance.Metadata["_test_package_name"] = fmt.Sprintf("image_test%s", suffix) instance.Metadata["_compute_endpoint"] = t.wf.ComputeEndpoint instance.Metadata["_exclude_discrete_tests"] = t.testExcludeFilter createInstances := &daisy.CreateInstances{} createInstances.InstancesBeta = append(createInstances.InstancesBeta, instance) createVMStep, ok := t.wf.Steps[createVMsStepName] if ok { // append to existing step. createVMStep.CreateInstances.InstancesBeta = append(createVMStep.CreateInstances.InstancesBeta, instance) } else { var err error createVMStep, err = t.wf.NewStep(createVMsStepName) if err != nil { return nil, nil, err } createVMStep.CreateInstances = createInstances } instance.Metadata["_cit_timeout"] = t.wf.DefaultTimeout return createVMStep, instance, nil } // appendCreateDisksStep should be called for creating the boot disk, or first disk in a VM. func (t *TestWorkflow) appendCreateDisksStep(diskParams *compute.Disk) (*daisy.Step, error) { if diskParams == nil || diskParams.Name == "" { return nil, fmt.Errorf("failed to create disk with empty parameters") } bootdisk := &daisy.Disk{} bootdisk.Name = diskParams.Name bootdisk.SourceImage = t.ImageURL bootdisk.Type = diskParams.Type bootdisk.Zone = diskParams.Zone bootdisk.SizeGb = strconv.FormatInt(diskParams.SizeGb, 10) createDisks := &daisy.CreateDisks{bootdisk} createDisksStep, ok := t.wf.Steps[createDisksStepName] if ok { // append to existing step. *createDisksStep.CreateDisks = append(*createDisksStep.CreateDisks, bootdisk) } else { var err error createDisksStep, err = t.wf.NewStep(createDisksStepName) if err != nil { return nil, err } createDisksStep.CreateDisks = createDisks } return createDisksStep, nil } // appendCreateMountDisksStep should be called for any disk which is not the vm boot disk. func (t *TestWorkflow) appendCreateMountDisksStep(diskParams *compute.Disk) (*daisy.Step, error) { if diskParams == nil || diskParams.Name == "" { return nil, fmt.Errorf("failed to create disk with empty parameters") } mountdisk := &daisy.Disk{} mountdisk.Name = diskParams.Name mountdisk.Type = diskParams.Type mountdisk.Zone = diskParams.Zone if diskParams.SizeGb == 0 { return nil, fmt.Errorf("failed to create mount disk with no SizeGb parameter") } mountdisk.SizeGb = strconv.FormatInt(diskParams.SizeGb, 10) createDisks := &daisy.CreateDisks{mountdisk} createDisksStep, ok := t.wf.Steps[createDisksStepName] if ok { // append to existing step. *createDisksStep.CreateDisks = append(*createDisksStep.CreateDisks, mountdisk) } else { var err error createDisksStep, err = t.wf.NewStep(createDisksStepName) if err != nil { return nil, err } createDisksStep.CreateDisks = createDisks } return createDisksStep, nil } func (t *TestWorkflow) appendCreateImageStep(stepname string, image *compute.Image) (*daisy.Step, error) { createImages := &daisy.Image{} createImages.Name = image.Name createImages.SourceDisk = image.SourceDisk createImagesStep, ok := t.wf.Steps[stepname] if ok { // append to existing step. createImagesStep.CreateImages.Images = append(createImagesStep.CreateImages.Images, createImages) } else { var err error createImagesStep, err = t.wf.NewStep(stepname) if err != nil { return nil, err } createImagesStep.CreateImages.Images = []*daisy.Image{createImages} } return createImagesStep, nil } // Detaches all the specified disks from the specified VM. func (t *TestWorkflow) appendDetachDiskStep(stepname, vmname string, disknames []string) (*daisy.Step, error) { disks := &daisy.DetachDisks{} for _, diskname := range disknames { detachDisk := &daisy.DetachDisk{} detachDisk.Instance = vmname detachDisk.DeviceName = diskname *disks = append(*disks, detachDisk) } detachDisksStep, err := t.wf.NewStep(stepname) if err != nil { return nil, err } detachDisksStep.DetachDisks = disks return detachDisksStep, nil } func (t *TestWorkflow) addWaitStoppedStep(stepname, vmname string) (*daisy.Step, error) { instanceSignal := &daisy.InstanceSignal{} instanceSignal.Name = vmname instanceSignal.Stopped = true waitForInstances := &daisy.WaitForInstancesSignal{instanceSignal} waitStep, err := t.wf.NewStep("wait-stopped-" + stepname) if err != nil { return nil, err } waitStep.WaitForInstancesSignal = waitForInstances return waitStep, nil } func (t *TestWorkflow) addWaitStep(stepname, vmname string) (*daisy.Step, error) { serialOutput := &daisy.SerialOutput{} serialOutput.Port = 1 serialOutput.SuccessMatch = successMatch instanceSignal := &daisy.InstanceSignal{} instanceSignal.Name = vmname instanceSignal.Stopped = false guestAttribute := &daisy.GuestAttribute{} guestAttribute.Namespace = utils.GuestAttributeTestNamespace guestAttribute.KeyName = utils.GuestAttributeTestKey instanceSignal.SerialOutput = serialOutput instanceSignal.GuestAttribute = guestAttribute instanceSignal.Interval = "8s" waitForInstances := &daisy.WaitForInstancesSignal{instanceSignal} waitStep, err := t.wf.NewStep("wait-" + stepname) if err != nil { return nil, err } waitStep.WaitForInstancesSignal = waitForInstances return waitStep, nil } // after guest attributes for instance wait step matching are implemented, this step will wait for a different guest attribute key than addWaitStep func (t *TestWorkflow) addWaitRebootGAStep(stepname, vmname string) (*daisy.Step, error) { serialOutput := &daisy.SerialOutput{} serialOutput.Port = 1 serialOutput.SuccessMatch = successMatch instanceSignal := &daisy.InstanceSignal{} instanceSignal.Name = vmname instanceSignal.Stopped = false guestAttribute := &daisy.GuestAttribute{} guestAttribute.Namespace = utils.GuestAttributeTestNamespace // specifically wait for a different guest attribute if this is the // first boot before a reboot, and we want test results from a reboot. guestAttribute.KeyName = utils.FirstBootGAKey instanceSignal.SerialOutput = serialOutput instanceSignal.GuestAttribute = guestAttribute instanceSignal.Interval = "8s" waitForInstances := &daisy.WaitForInstancesSignal{instanceSignal} waitStep, err := t.wf.NewStep("wait-" + stepname) if err != nil { return nil, err } waitStep.WaitForInstancesSignal = waitForInstances return waitStep, nil } func (t *TestWorkflow) addStopStep(stepname, vmname string) (*daisy.Step, error) { stopInstances := &daisy.StopInstances{} stopInstances.Instances = append(stopInstances.Instances, vmname) stopInstancesStep, err := t.wf.NewStep("stop-" + stepname) if err != nil { return nil, err } stopInstancesStep.StopInstances = stopInstances return stopInstancesStep, nil } func (t *TestWorkflow) addDiskResizeStep(stepname, vmname string, diskSize int) (*daisy.Step, error) { resizeDisk := &daisy.ResizeDisk{} resizeDisk.DisksResizeRequest.SizeGb = int64(diskSize) resizeDisk.Name = vmname resizeDiskStepName := "resize-disk-" + stepname resizeDiskStep, err := t.wf.NewStep(resizeDiskStepName) if err != nil { return nil, err } resizeDiskStep.ResizeDisks = &daisy.ResizeDisks{resizeDisk} return resizeDiskStep, nil } func (t *TestWorkflow) addStartStep(stepname, vmname string) (*daisy.Step, error) { startInstances := &daisy.StartInstances{} startInstances.Instances = append(startInstances.Instances, vmname) startInstancesStep, err := t.wf.NewStep("start-" + stepname) if err != nil { return nil, err } startInstancesStep.StartInstances = startInstances return startInstancesStep, nil } func (t *TestWorkflow) appendCreateNetworkStep(network *daisy.Network) (*daisy.Step, *daisy.Network, error) { createNetworks := &daisy.CreateNetworks{} *createNetworks = append(*createNetworks, network) createNetworkStep, ok := t.wf.Steps[createNetworkStepName] if ok { // append to existing step. *createNetworkStep.CreateNetworks = append(*createNetworkStep.CreateNetworks, network) } else { var err error createNetworkStep, err = t.wf.NewStep(createNetworkStepName) if err != nil { return nil, nil, err } createNetworkStep.CreateNetworks = createNetworks } return createNetworkStep, network, nil } func (t *TestWorkflow) appendCreateSubnetworksStep(subnetwork *daisy.Subnetwork) (*daisy.Step, *daisy.Subnetwork, error) { createSubnetworks := &daisy.CreateSubnetworks{} *createSubnetworks = append(*createSubnetworks, subnetwork) createSubnetworksStep, ok := t.wf.Steps[createSubnetworkStepName] if ok { // append to existing step. *createSubnetworksStep.CreateSubnetworks = append(*createSubnetworksStep.CreateSubnetworks, subnetwork) } else { var err error createSubnetworksStep, err = t.wf.NewStep(createSubnetworkStepName) if err != nil { return nil, nil, err } createSubnetworksStep.CreateSubnetworks = createSubnetworks } return createSubnetworksStep, subnetwork, nil } func getGCSPrefix(ctx context.Context, storageClient *storage.Client, project, gcsPath string) (string, error) { // Set global client. client = storageClient // If user didn't specify gcsPath, detect or create the bucket. if gcsPath == "" { bucket, err := daisyBucket(ctx, client, project) if err != nil { return "", fmt.Errorf("failed to find or create daisy bucket: %v", err) } gcsPath = fmt.Sprintf("gs://%s", bucket) } return fmt.Sprintf("%s/%s", strings.TrimSuffix(gcsPath, "/"), time.Now().Format(time.RFC3339)), nil } // finalizeWorkflows adds the final necessary data to each workflow for it to // be able to run, including the final copy-objects step. func finalizeWorkflows(ctx context.Context, tests []*TestWorkflow, zone, gcsPrefix, localPath string) error { log.Printf("Storing artifacts and logs in %s", gcsPrefix) for _, twf := range tests { if twf.wf == nil { return fmt.Errorf("found nil workflow in finalize") } twf.wf.StorageClient = client // $GCS_PATH/2021-04-20T11:44:08-07:00/image_validation/debian-10 twf.GCSPath = fmt.Sprintf("%s/%s/%s", gcsPrefix, twf.Name, twf.Image.Name) twf.wf.GCSPath = twf.GCSPath twf.wf.Zone = zone // Process quota steps and associated creation steps. for quotaStepName, createStepName := range map[string]string{ waitForVMQuotaStepName: createVMsStepName, waitForDisksQuotaStepName: createDisksStepName, } { quotaStep, ok := twf.wf.Steps[quotaStepName] if !ok { continue } for _, q := range quotaStep.WaitForAvailableQuotas.Quotas { // Populate empty regions with best guess from the zone if q.Region == "" { lastIndex := strings.LastIndex(twf.wf.Zone, "-") q.Region = twf.wf.Zone[:lastIndex] } } createStep, ok := twf.wf.Steps[createStepName] if !ok { continue } // Fix dependencies. Create steps should depend on the quota step, and quota steps should inherit all other dependencies. for _, dep := range twf.wf.Dependencies[createStepName] { dStep, ok := twf.wf.Steps[dep] if ok { if err := twf.wf.AddDependency(quotaStep, dStep); err != nil { return err } } } if err := twf.wf.AddDependency(createStep, quotaStep); err != nil { return err } } // Assume amd64 when arch is not set. arch := "amd64" if twf.Image.Architecture == "ARM64" { arch = "arm64" } createDisksStep, createDisksOk := twf.wf.Steps[createDisksStepName] createVMsStep, ok := twf.wf.Steps[createVMsStepName] if ok { for _, vm := range createVMsStep.CreateInstances.Instances { if vm.MachineType != "" { log.Printf("VM %s machine type set to %s for test %s\n", vm.Name, vm.MachineType, twf.Name) } else { vm.MachineType = twf.MachineType.Name } if vm.Zone != "" && vm.Zone != twf.wf.Zone { log.Printf("VM %s zone is set to %s, differing from workflow zone %s for test %s, not overriding\n", vm.Name, vm.Zone, twf.wf.Zone, twf.Name) } if createDisksOk && (strings.HasPrefix(vm.MachineType, "c4-") || strings.HasPrefix(vm.MachineType, "n4-") || strings.HasPrefix(vm.MachineType, "c3-")) { for _, attachedDisk := range vm.Disks { for _, disk := range *createDisksStep.CreateDisks { if attachedDisk.Source == disk.Name && disk.Type == "" { disk.Type = HyperdiskBalanced } } } } // Correct initialize params disk types for custom attached disks. for _, attachedDisk := range vm.Disks { if attachedDisk.InitializeParams != nil { // Default to standard if no disk type is specified. if attachedDisk.InitializeParams.DiskType != "" && !strings.HasPrefix(attachedDisk.InitializeParams.DiskType, "zones/") { // Qualify the disk type with the zone if it's missing. attachedDisk.InitializeParams.DiskType = fmt.Sprintf("zones/%s/diskTypes/%s", vm.Zone, attachedDisk.InitializeParams.DiskType) } } } } } if utils.HasFeature(twf.Image, "WINDOWS") { archBits := "64" if strings.Contains(twf.ImageURL, "x86") { archBits = "32" } twf.wf.Sources["testpackage"] = fmt.Sprintf("%s/%s%s.exe", localPath, twf.Name, archBits) twf.wf.Sources["wrapper.exe"] = fmt.Sprintf("%s/%s%s.exe", localPath, testWrapperPathWindows, archBits) } else { twf.wf.Sources["testpackage"] = fmt.Sprintf("%s/%s.%s.test", localPath, twf.Name, arch) twf.wf.Sources["wrapper"] = fmt.Sprintf("%s%s.%s", localPath, testWrapperPath, arch) } // add a final copy-objects step which copies the daisy-outs-path directory to twf.gcsPath + /outs copyGCSObject := daisy.CopyGCSObject{} copyGCSObject.Source = "${OUTSPATH}/" // Trailing slash apparently crucial. copyGCSObject.Destination = twf.GCSPath + "/outs" copyGCSObjects := &daisy.CopyGCSObjects{copyGCSObject} copyStep, err := twf.wf.NewStep("copy-objects") if err != nil { return fmt.Errorf("failed to add copy-objects step to workflow %s: %v", twf.Name, err) } copyStep.CopyGCSObjects = copyGCSObjects // The "copy-objects" step depends on every wait step. for stepname, step := range twf.wf.Steps { if !strings.HasPrefix(stepname, "wait-") { continue } if err := twf.wf.AddDependency(copyStep, step); err != nil { return fmt.Errorf("failed to add copy-objects step: %v", err) } } } return nil } type testResult struct { testWorkflow *TestWorkflow skipped bool workflowSuccess bool err error results []string } func getTestResults(ctx context.Context, ts *TestWorkflow) ([]string, error) { results := []string{} createVMsStep, ok := ts.wf.Steps[createVMsStepName] if ok { for _, vm := range createVMsStep.CreateInstances.Instances { out, err := utils.DownloadGCSObject(ctx, client, vm.Metadata["_test_results_url"]) if err != nil { return nil, fmt.Errorf("failed to get results for test %s vm %s: %v", ts.Name, vm.Name, err) } results = append(results, string(out)) } for _, vm := range createVMsStep.CreateInstances.InstancesBeta { out, err := utils.DownloadGCSObject(ctx, client, vm.Metadata["_test_results_url"]) if err != nil { return nil, fmt.Errorf("failed to get results for test %s vm %s: %v", ts.Name, vm.Name, err) } results = append(results, string(out)) } } return results, nil } // NewTestWorkflow returns a new TestWorkflow. func NewTestWorkflow(opts *TestWorkflowOpts) (*TestWorkflow, error) { t := &TestWorkflow{} t.counter = 0 t.Name = opts.Name t.ImageURL = opts.Image t.Client = opts.Client t.testExcludeFilter = opts.ExcludeFilter if opts.UseReservations { reservationType := "ANY_RESERVATION" var reservationKey string if len(opts.ReservationURLs) > 0 { reservationKey = "compute.googleapis.com/reservation-name" reservationType = "SPECIFIC_RESERVATION" } t.ReservationAffinity = &compute.ReservationAffinity{ConsumeReservationType: reservationType, Values: opts.ReservationURLs, Key: reservationKey} t.ReservationAffinityBeta = &computeBeta.ReservationAffinity{ConsumeReservationType: reservationType, Values: opts.ReservationURLs, Key: reservationKey} t.AcceleratorType = opts.AcceleratorType } var err error t.Project, err = t.Client.GetProject(opts.Project) if err != nil { return nil, err } t.Zone, err = t.Client.GetZone(t.Project.Name, opts.Zone) if err != nil { return nil, err } // Initializing Image inside the TestWorkflow split := strings.Split(opts.Image, "/") if strings.Contains(opts.Image, "family") { t.Image, err = t.Client.GetImageFromFamily(split[1], split[len(split)-1]) } else { t.Image, err = t.Client.GetImage(split[1], split[len(split)-1]) } // Initializing ImageBeta inside the TestWorkflow, this is required to provide Beta support to cvm testsuite if strings.Contains(opts.Image, "family") { t.ImageBeta, err = t.Client.GetImageFromFamilyBeta(split[1], split[len(split)-1]) } else { t.ImageBeta, err = t.Client.GetImageBeta(split[1], split[len(split)-1]) } if err != nil { return nil, err } if t.Image.Architecture == "ARM64" { t.MachineType, err = t.Client.GetMachineType(t.Project.Name, t.Zone.Name, opts.ARM64Shape) } else { t.MachineType, err = t.Client.GetMachineType(t.Project.Name, t.Zone.Name, opts.X86Shape) } if err != nil { return nil, err } t.wf = daisy.New() if opts.ComputeEndpointOverride != "" { t.wf.ComputeEndpoint = opts.ComputeEndpointOverride } t.wf.Name = strings.ReplaceAll(opts.Name, "_", "-") t.wf.DefaultTimeout = opts.Timeout t.wf.Zone = opts.Zone t.wf.DisableCloudLogging() t.wf.DisableStdoutLogging() return t, nil } // PrintTests prints all test workflows. func PrintTests(ctx context.Context, storageClient *storage.Client, testWorkflows []*TestWorkflow, project, zone, gcsPath, localPath string) { gcsPrefix, err := getGCSPrefix(ctx, storageClient, project, gcsPath) if err != nil { log.Printf("Error determining GCS prefix: %v", err) gcsPrefix = "" } if err := finalizeWorkflows(ctx, testWorkflows, zone, gcsPrefix, localPath); err != nil { log.Printf("Error finalizing workflow: %v", err) } for _, test := range testWorkflows { if test.wf == nil { log.Printf("%s test on image %s: workflow was nil, skipping", test.Name, test.Image.Name) continue } test.wf.Print(ctx) } } // ValidateTests validates all test workflows. func ValidateTests(ctx context.Context, storageClient *storage.Client, testWorkflows []*TestWorkflow, project, zone, gcsPath, localPath string) error { gcsPrefix, err := getGCSPrefix(ctx, storageClient, project, gcsPath) if err != nil { return err } if err := finalizeWorkflows(ctx, testWorkflows, zone, gcsPrefix, localPath); err != nil { return err } for _, test := range testWorkflows { log.Printf("Validating test %s on image %s\n", test.Name, test.Image.Name) if test.wf == nil { return fmt.Errorf("%s test on image %s: workflow was nil", test.Name, test.Image.Name) } if err := test.wf.Validate(ctx); err != nil { return err } } return nil } // daisyBucket returns the bucket name for outputs, creating it if needed. func daisyBucket(ctx context.Context, client *storage.Client, project string) (string, error) { bucketName := strings.Replace(project, ":", "-", -1) + "-cloud-test-outputs" it := client.Buckets(ctx, project) for attr, err := it.Next(); err != iterator.Done; attr, err = it.Next() { if err != nil { return "", fmt.Errorf("failed to iterate buckets: %v", err) } if attr.Name == bucketName { return bucketName, nil } } if err := client.Bucket(bucketName).Create(ctx, project, nil); err != nil { return "", fmt.Errorf("failed to create bucket: %v", err) } return bucketName, nil } // testMetrics is a simple struct to track the progress of a test run. type testMetrics struct { // total is the total number of tests. total int // running is the number of tests currently running. running int // finished is the number of tests that have finished. finished int // mu is a mutex to protect the metrics. mu sync.Mutex } // newTestMetrics returns a new testMetrics with total initialized with the // given value. func newTestMetrics(total int) *testMetrics { return &testMetrics{total: total} } // started increments the running count. func (tm *testMetrics) started() { tm.mu.Lock() defer tm.mu.Unlock() tm.running++ } // done decrements the running count and increments the finished count. func (tm *testMetrics) done() { tm.mu.Lock() defer tm.mu.Unlock() tm.running-- tm.finished++ } // progress returns a string describing the current progress of the test // execution. func (tm *testMetrics) progress() string { tm.mu.Lock() defer tm.mu.Unlock() return fmt.Sprintf("total: %d, running: %d, finished: %d, delta: %d", tm.total, tm.running, tm.finished, tm.total-tm.finished) } // RunTests runs all test workflows. func RunTests(ctx context.Context, storageClient *storage.Client, testWorkflows []*TestWorkflow, project, zone, gcsPath, localPath string, parallelCount int, parallelStagger string, testProjects []string) (junit.Testsuites, error) { gcsPrefix, err := getGCSPrefix(ctx, storageClient, project, gcsPath) if err != nil { return junit.Testsuites{}, err } stagger, err := time.ParseDuration(parallelStagger) if err != nil { return junit.Testsuites{}, err } metrics := newTestMetrics(len(testWorkflows)) finalizeWorkflows(ctx, testWorkflows, zone, gcsPrefix, localPath) testResults := make(chan testResult, len(testWorkflows)) testchan := make(chan *TestWorkflow, len(testWorkflows)) // Whenever we select a test project, we want to do so in a semi-random order // that is unpredictable but doesn't have the ability to place all tests in a // single project by chance (however small). This should randomize our usage // patterns in static invocations of CIT (eg CI invocations) a bit more. exclusiveProjects := make(chan string, len(testProjects)) // Select from testProjects in a random order, deleting afterwards to avoid // selecting a duplicate. nextProjects := make([]string, len(testProjects)) copy(nextProjects, testProjects) for range testProjects { i := rand.Intn(len(nextProjects)) exclusiveProjects <- nextProjects[i] nextProjects = append(nextProjects[:i], nextProjects[i+1:]...) } projects := make(chan string, len(testWorkflows)) // Same technique as above, but this time we might have more workflows than // projects, so anytime we delete all projects we reset to the full list. nextProjects = make([]string, len(testProjects)) copy(nextProjects, testProjects) for range testWorkflows { if len(nextProjects) < 1 { nextProjects = make([]string, len(testProjects)) copy(nextProjects, testProjects) } i := rand.Intn(len(nextProjects)) projects <- nextProjects[i] nextProjects = append(nextProjects[:i], nextProjects[i+1:]...) } close(projects) var wg sync.WaitGroup for i := 0; i < parallelCount; i++ { wg.Add(1) go func(metrics *testMetrics, id int) { defer wg.Done() time.Sleep(time.Duration(id) * stagger) for test := range testchan { if test.lockProject { // This will block until an exclusive project is available. log.Printf("test %s/%s requires write lock for project", test.Name, test.Image.Name) test.wf.Project = <-exclusiveProjects } else { test.wf.Project = <-projects } testResults <- runTestWorkflow(ctx, metrics, test) if test.lockProject { // "unlock" the project. exclusiveProjects <- test.wf.Project } } }(metrics, i) } for _, ts := range testWorkflows { testchan <- ts } close(testchan) wg.Wait() var suites junit.Testsuites var runtime float64 for i := 0; i < len(testWorkflows); i++ { suites.Suites = append(suites.Suites, parseResult(<-testResults, localPath)) } for _, suite := range suites.Suites { suites.Errors += suite.Errors suites.Failures += suite.Failures suites.Tests += suite.Tests suites.Disabled += suite.Disabled suites.Skipped += suite.Skipped if i, err := strconv.ParseFloat(suite.Time, 64); err == nil { runtime += i } } suites.Time = fmt.Sprintf("%.3f", runtime) return suites, nil } func formatTimeDelta(format string, t time.Duration) string { z := time.Unix(0, 0).UTC() return z.Add(time.Duration(t)).Format(format) } func runTestWorkflow(ctx context.Context, metrics *testMetrics, test *TestWorkflow) testResult { metrics.started() res := testResult{testWorkflow: test} if test.skipped { res.skipped = true res.err = fmt.Errorf("test suite was skipped with message: %q", res.testWorkflow.SkippedMessage()) return res } clean := func() { metrics.done() log.Printf("cleaning up after test %s/%s (ID %s) in project %s, progress: %s\n", test.Name, test.Image.Name, test.wf.ID(), test.wf.Project, metrics.progress()) cleaned, errs := cleanTestWorkflow(test) for _, err := range errs { log.Printf("error cleaning test %s/%s: %v\n", test.Name, test.Image.Name, err) } if len(cleaned) > 0 { log.Printf("test %s/%s had %d leftover resources\n", test.Name, test.Image.Name, len(cleaned)) } for _, c := range cleaned { log.Printf("deleted resource %s from test %s/%s", c, test.Name, test.Image.Name) } } defer clean() start := time.Now() log.Printf("running test %s/%s (ID %s) in project %s, progress: %s\n", test.Name, test.Image.Name, test.wf.ID(), test.wf.Project, metrics.progress()) if err := test.wf.Run(ctx); err != nil { res.err = err return res } delta := formatTimeDelta("04m 05s", time.Now().Sub(start)) log.Printf("finished test %s/%s (ID %s) in project %s, time spent: %s\n", test.Name, test.Image.Name, test.wf.ID(), test.wf.Project, delta) results, err := getTestResults(ctx, test) if err != nil { res.err = err return res } res.results = results res.workflowSuccess = true return res } func cleanTestWorkflow(test *TestWorkflow) (totalCleaned []string, totalErrs []error) { c := cleanerupper.Clients{Daisy: test.Client} policy := cleanerupper.WorkflowPolicy(test.wf.ID()) cleaned, errs := cleanerupper.CleanInstances(c, test.wf.Project, policy, false) totalCleaned = append(totalCleaned, cleaned...) totalErrs = append(totalErrs, errs...) cleaned, errs = cleanerupper.CleanDisks(c, test.wf.Project, policy, false) totalCleaned = append(totalCleaned, cleaned...) totalErrs = append(totalErrs, errs...) cleaned, errs = cleanerupper.CleanLoadBalancerResources(c, test.wf.Project, policy, false) totalCleaned = append(totalCleaned, cleaned...) totalErrs = append(totalErrs, errs...) cleaned, errs = cleanerupper.CleanNetworks(c, test.wf.Project, policy, false) totalCleaned = append(totalCleaned, cleaned...) totalErrs = append(totalErrs, errs...) return } // gets result struct and converts to a jUnit TestSuite func parseResult(res testResult, localPath string) junit.Testsuite { ret := junit.Testsuite{} name := getTestSuiteName(res.testWorkflow) switch { case res.skipped: for _, test := range getTestsBySuiteName(res.testWorkflow.Name, localPath) { tc := junit.Testcase{} tc.Classname = name tc.Name = test tc.Skipped = &junit.Result{Data: res.testWorkflow.SkippedMessage()} ret.Testcases = append(ret.Testcases, tc) ret.Tests++ ret.Skipped++ } case res.workflowSuccess: // Workflow completed without error. Only in this case do we try to parse the result. ret = convertToTestSuite(res.results, name) // Tests handled by a suite but not executed or skipped should be marked disabled for _, test := range getTestsBySuiteName(res.testWorkflow.Name, localPath) { hasResult := false for _, tc := range ret.Testcases { if tc.Name == test { hasResult = true break } } if hasResult { continue } newTc := junit.Testcase{} newTc.Classname = name newTc.Name = test newTc.Skipped = &junit.Result{Data: fmt.Sprintf("%s disabled on %s", test, res.testWorkflow.ImageURL)} ret.Testcases = append(ret.Testcases, newTc) ret.Tests++ ret.Skipped++ } ret.AddProperty("image_family", res.testWorkflow.Image.Family) ret.AddProperty("image", res.testWorkflow.Image.SelfLink) ret.AddProperty("project", res.testWorkflow.Project.Name) default: var status string if res.err != nil { status = res.err.Error() } else { status = "Unknown status" } for _, test := range getTestsBySuiteName(res.testWorkflow.Name, localPath) { tc := junit.Testcase{} tc.Classname = name tc.Name = test tc.Failure = &junit.Result{Data: status, Type: "Failure"} ret.Testcases = append(ret.Testcases, tc) ret.Tests++ ret.Failures++ } } ret.Name = name if ret.Time == "" { ret.Time = "0.000" } return ret } func getTestSuiteName(testWorkflow *TestWorkflow) string { // Use ImageURL instead of the name or family to display results the same way // as the user entered them. return fmt.Sprintf("%s-%s", testWorkflow.Name, strings.Split(testWorkflow.ImageURL, "/")[len(strings.Split(testWorkflow.ImageURL, "/"))-1]) } func getTestsBySuiteName(name, localPath string) []string { b, err := ioutil.ReadFile(fmt.Sprintf("%s/%s_tests.txt", localPath, name)) if err != nil { log.Fatalf("unable to parse tests list: %v", err) return []string{} // NOT nil } var res []string for _, testname := range strings.Split(string(b), "\n") { if strings.HasPrefix(testname, "Test") { res = append(res, testname) } } return res } func (t *TestWorkflow) getLastStepForVM(vmname string) (*daisy.Step, error) { step := "wait-" + vmname if _, ok := t.wf.Steps[step]; !ok { return nil, fmt.Errorf("no step " + step) } rdeps := make(map[string][]string) for dependent, dependencies := range t.wf.Dependencies { for _, dependency := range dependencies { rdeps[dependency] = append(rdeps[dependency], dependent) } } for { deps, ok := rdeps[step] if !ok { // no more steps depend on this one break } if len(deps) > 1 { return nil, fmt.Errorf("workflow has non-linear dependencies") } step = deps[0] } return t.wf.Steps[step], nil } func (t *TestWorkflow) skipWindowsStagingKMS(isWindows bool, instance *daisy.Instance) { if isWindows && t.wf.ComputeEndpoint == "https://www.googleapis.com/compute/staging_v1/" { if instance.Metadata == nil { instance.Metadata = make(map[string]string) } instance.Metadata["sysprep-specialize-script-ps1"] = `New-Item -Path "C:\Program Files\Google\Compute Engine\sysprep\byol_image" -ItemType directory` } }