e2etest/runner.go (386 lines of code) (raw):

// Copyright © Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package e2etest import ( "bytes" "encoding/json" "errors" "fmt" "os" "os/exec" "path/filepath" "reflect" "strconv" "strings" "github.com/Azure/azure-storage-azcopy/v10/common" ) // encapsulates the interaction with the AzCopy instance that is being tested // the flag names should be captured here so that in case they change, only 1 place needs to be updated type TestRunner struct { flags map[string]string } func newTestRunner() TestRunner { return TestRunner{flags: make(map[string]string)} } var isLaunchedByDebugger = func() bool { // gops executable must be in the path. See https://github.com/google/gops gopsOut, err := exec.Command("gops", strconv.Itoa(os.Getppid())).Output() if err == nil && strings.Contains(string(gopsOut), "\\dlv.exe") { // our parent process is (probably) the Delve debugger return true } return false }() func (t *TestRunner) SetAllFlags(s *scenario) { p := s.p o := s.operation set := func(key string, value interface{}, dflt interface{}, formats ...string) { if value == dflt { return // nothing to do. The flag is not supposed to be set } reflectVal := reflect.ValueOf(value) // check for pointer if reflectVal.Kind() == reflect.Pointer { result := reflectVal.Elem() // attempt to deref if result != (reflect.Value{}) && result.CanInterface() { // can we grab the underlying value? value = result.Interface() } else { return // nothing to use } } format := "%v" if len(formats) > 0 { format = formats[0] } t.flags[key] = fmt.Sprintf(format, value) } if o == eOperation.Benchmark() { set("mode", p.mode, "") set("file-count", p.fileCount, 0) set("size-per-file", p.sizePerFile, "") return } if o == eOperation.Cancel() { set("ignore-error-if-completed", p.ignoreErrorIfCompleted, "") return } // TODO: TODO: nakulkar-msft there will be many more to add here set("recursive", p.recursive, false) set("as-subdir", !p.invertedAsSubdir, true) set("include-path", p.includePath, "") set("exclude-path", p.excludePath, "") set("include-pattern", p.includePattern, "") set("exclude-pattern", p.excludePattern, "") set("include-after", p.includeAfter, "") set("include-pattern", p.includePattern, "") set("exclude-path", p.excludePath, "") set("exclude-pattern", p.excludePattern, "") set("cap-mbps", p.capMbps, float32(0)) set("block-size-mb", p.blockSizeMB, float32(0)) set("put-blob-size-mb", p.putBlobSizeMB, float32(0)) set("s2s-detect-source-changed", p.s2sSourceChangeValidation, false) set("metadata", p.metadata, "") set("cancel-from-stdin", p.cancelFromStdin, false) set("preserve-smb-info", p.preserveSMBInfo, nil) set("preserve-smb-permissions", p.preserveSMBPermissions, false) set("backup", p.backupMode, false) set("blob-tags", p.blobTags, "") set("blob-type", p.blobType, "") set("s2s-preserve-blob-tags", p.s2sPreserveBlobTags, false) set("cpk-by-name", p.cpkByName, "") set("cpk-by-value", p.cpkByValue, false) set("is-object-dir", p.isObjectDir, false) set("debug-skip-files", strings.Join(p.debugSkipFiles, ";"), "") set("check-md5", p.checkMd5.String(), "FailIfDifferent") set("trailing-dot", p.trailingDot.String(), "Enable") set("force-if-read-only", p.forceIfReadOnly, false) set("delete-destination-file", p.deleteDestinationFile, false) if o == eOperation.Copy() { set("s2s-preserve-access-tier", p.s2sPreserveAccessTier, true) set("preserve-posix-properties", p.preservePOSIXProperties, "") switch p.symlinkHandling { case common.ESymlinkHandlingType.Follow(): set("follow-symlinks", true, nil) case common.ESymlinkHandlingType.Preserve(): set("preserve-symlinks", true, nil) } target := s.GetTestFiles().objectTarget if s.fromTo.From() == common.ELocation.Blob() && s.fs.isListOfVersions() { // Otherwise, it must be a list. s.a.Assert(s.fromTo.From(), equals(), common.ELocation.Blob(), "list of files can only be used in blob.") versions := s.GetSource().(*resourceBlobContainer).getVersions(s.a, target.objectName) s.a.Assert(len(versions) > 0, equals(), true, "blob was expected to have versions!") listOfVersions := make([]string, len(target.versions)) for idx, val := range target.versions { s.a.Assert(int(val) < len(versions), equals(), true, fmt.Sprintf("Not enough versions are present! (needed version %d of %d)", val, len(versions))) listOfVersions[idx] = versions[val] } file, err := os.CreateTemp("", "listofversions*.json") defer func(file *os.File) { _ = file.Close() }(file) s.a.AssertNoErr(err, "create temp list of versions file") for _, v := range listOfVersions { _, err = file.WriteString(v + "\n") s.a.AssertNoErr(err, "write to list of versions file") } set("list-of-versions", file.Name(), "") } } else if o == eOperation.Sync() { set("delete-destination", p.deleteDestination.String(), "False") set("preserve-posix-properties", p.preservePOSIXProperties, false) set("compare-hash", p.compareHash.String(), "None") set("local-hash-storage-mode", p.hashStorageMode.String(), common.EHashStorageMode.Default().String()) set("hash-meta-dir", p.hashStorageDir, "") } } func (t *TestRunner) SetAwaitOpenFlag() { t.flags["await-open"] = "true" } func (t *TestRunner) computeArgs() []string { args := make([]string, 0) for key, value := range t.flags { args = append(args, fmt.Sprintf("--%s=%s", key, value)) } args = append(args, "--log-level=DEBUG") return append(args, "--output-type=json") } // execCommandWithOutput replaces Go's exec.Command().Output, but appends an extra parameter and // breaks up the c.Run() call into its component parts. Both changes are to assist debugging func (t *TestRunner) execDebuggableWithOutput(name string, args []string, env []string, afterStart func() string, chToStdin <-chan string) ([]byte, error) { debug := isLaunchedByDebugger if debug { args = append(args, "--await-continue") } c := exec.Command(name, args...) // add environment variables if env != nil { c.Env = env } var stdout bytes.Buffer var stderr bytes.Buffer stdin, err := c.StdinPipe() if err != nil { return make([]byte, 0), err } c.Stdout = &stdout c.Stderr = &stderr // instead of err := c.Run(), we do the following runErr := c.Start() if runErr == nil { defer func() { _ = c.Process.Kill() // in case we never finish c.Wait() below, and get panicked or killed }() if debug { beginAzCopyDebugging(stdin) } // perform a specific post-start action if afterStart != nil { msgToApp := afterStart() // perform a local action, here in the test suite, that may optionally produce a message to send to the the app if msgToApp != "" { _, _ = stdin.Write([]byte(msgToApp + "\n")) // TODO: maybe change this to use chToStdIn } } // allow on-going messages to stdin if chToStdin != nil { go func() { for { msg, ok := <-chToStdin if ok { _, _ = stdin.Write([]byte(msg + "\n")) } else { break } } }() } // wait for completion runErr = c.Wait() } // back to normal exec.Cmd.Output() processing if runErr != nil { if ee, ok := runErr.(*exec.ExitError); ok { ee.Stderr = stderr.Bytes() } } return stdout.Bytes(), runErr } func (t *TestRunner) ExecuteAzCopyCommand(operation Operation, src, dst string, needsOAuth bool, oauthMode string, needsFromTo bool, fromTo common.FromTo, afterStart func() string, chToStdin <-chan string, logDir string) (CopyOrSyncCommandResult, bool, error) { capLen := func(b []byte) []byte { if len(b) < 1024 { return b } else { return append(b[:1024], byte('\n')) } } verb := "" switch operation { case eOperation.Copy(): verb = "copy" case eOperation.Sync(): verb = "sync" case eOperation.Remove(): verb = "remove" case eOperation.Resume(): verb = "jobs resume" case eOperation.Cancel(): verb = "cancel" case eOperation.Benchmark(): verb = "bench" default: panic("unsupported operation type") } args := strings.Split(verb, " ") args = append(args, src) if operation.NeedsDst() { args = append(args, dst) } args = append(args, t.computeArgs()...) if needsFromTo { args = append(args, "--from-to="+fromTo.String()) } // pass along existing environment variables (because $HOME doesn't come along if we just use the OAuth vars, that can be troublesome!) env := make([]string, len(os.Environ())) copy(env, os.Environ()) if needsOAuth { switch strings.ToLower(oauthMode) { case common.EAutoLoginType.SPN().String(): tenId, appId, clientSecret := GlobalInputManager{}.GetServicePrincipalAuth() env = append(env, "AZCOPY_AUTO_LOGIN_TYPE="+common.Iff(oauthMode == "", common.EAutoLoginType.SPN().String(), oauthMode), "AZCOPY_SPA_APPLICATION_ID="+appId, "AZCOPY_SPA_CLIENT_SECRET="+clientSecret, ) if tenId != "" { env = append(env, "AZCOPY_TENANT_ID="+tenId) } case "", common.EAutoLoginType.AzCLI().String(): if os.Getenv("NEW_E2E_ENVIRONMENT") == AzurePipeline { // We are already logged in with AzCLI in Azure Pipeline } else { tenId, appId, clientSecret := GlobalInputManager{}.GetServicePrincipalAuth() args := []string{ "login", "--service-principal", "-u=" + appId, "-p=" + clientSecret, } if tenId != "" { args = append(args, "--tenant="+tenId) env = append(env, "AZCOPY_TENANT_ID="+tenId) } out, err := exec.Command("az", args...).Output() if err != nil { e, ok := err.(*exec.ExitError) if ok { return CopyOrSyncCommandResult{}, false, fmt.Errorf("%s\n%s\nfailed to login with AzCli: %s", e.Stderr, out, err.Error()) } else { return CopyOrSyncCommandResult{}, false, fmt.Errorf("failed to login with AzCli: %s", err.Error()) } } } env = append(env, "AZCOPY_AUTO_LOGIN_TYPE=AzCLI") case "pscred": var script string if os.Getenv("NEW_E2E_ENVIRONMENT") == AzurePipeline { tenId, clientId, token := GlobalInputManager{}.GetWorkloadIdentity() cmd := `Connect-AzAccount -ApplicationId %s -Tenant %s -FederatedToken %s` script = fmt.Sprintf(cmd, clientId, tenId, token) } else { tenId, appId, clientSecret := GlobalInputManager{}.GetServicePrincipalAuth() cmd := `$secret = ConvertTo-SecureString -String %s -AsPlainText -Force; $cred = New-Object -TypeName System.Management.Automation.PSCredential -ArgumentList %s, $secret; Connect-AzAccount -ServicePrincipal -Credential $cred` if tenId != "" { cmd += " -Tenant " + tenId } script = fmt.Sprintf(cmd, clientSecret, appId) } out, err := exec.Command("pwsh", "-Command", script).Output() if err != nil { e := err.(*exec.ExitError) e, ok := err.(*exec.ExitError) if ok { return CopyOrSyncCommandResult{}, false, fmt.Errorf("%s\n%s\nfailed to login with Powershell: %s", e.Stderr, out, err.Error()) } else { return CopyOrSyncCommandResult{}, false, fmt.Errorf("failed to login with Powershell: %s", err.Error()) } } env = append(env, "AZCOPY_AUTO_LOGIN_TYPE=PsCred") default: return CopyOrSyncCommandResult{}, false, errors.New("Unsupported OAuth mode " + oauthMode) } } if logDir != "" { env = append(env, "AZCOPY_LOG_LOCATION="+logDir) env = append(env, "AZCOPY_JOB_PLAN_LOCATION="+filepath.Join(logDir, "plans")) } out, err := t.execDebuggableWithOutput(GlobalInputManager{}.GetExecutablePath(), args, env, afterStart, chToStdin) wasClean := true stdErr := make([]byte, 0) if err != nil { if ee, ok := err.(*exec.ExitError); ok { stdErr = capLen(ee.Stderr) // cap length of this, because it can be a panic. But don't cap stdout, because we need its last line in newCopyOrSyncCommandResult if len(stdErr) > 0 { wasClean = false // something was written to stderr, probably a panic } } } if wasClean { // either it succeeded, for it returned a failure code in a clean (non-panic) way. // In both cases, we want out to be parsed, to get us the job ID. E.g. maybe 1 transfer out of several failed, // and that's what we'er actually testing for (so can't treat this as a fatal error). r, ok := newCopyOrSyncCommandResult(string(out)) if ok { return r, true, err } else { err = fmt.Errorf("could not parse AzCopy output. Run error, if any, was '%w'", err) } } return CopyOrSyncCommandResult{}, false, fmt.Errorf("azcopy run error: %w\n with stderr: %s\n and stdout: %s\n from args %v", err, stdErr, out, args) } func (t *TestRunner) SetTransferStatusFlag(value string) { t.flags["with-status"] = value } func (t *TestRunner) ExecuteJobsShowCommand(jobID common.JobID, azcopyDir string) (JobsShowCommandResult, error) { args := append([]string{"jobs", "show", jobID.String()}, t.computeArgs()...) cmd := exec.Command(GlobalInputManager{}.GetExecutablePath(), args...) if azcopyDir != "" { cmd.Env = append(cmd.Env, "AZCOPY_JOB_PLAN_LOCATION="+filepath.Join(azcopyDir, "plans")) } out, err := cmd.Output() if err != nil { return JobsShowCommandResult{}, err } return newJobsShowCommandResult(string(out)), nil } type CopyOrSyncCommandResult struct { jobID common.JobID finalStatus common.ListSyncJobSummaryResponse } func newCopyOrSyncCommandResult(rawOutput string) (CopyOrSyncCommandResult, bool) { lines := strings.Split(rawOutput, "\n") // parse out the final status // -2 because the last line is empty if len(lines) < 2 { return CopyOrSyncCommandResult{}, false } finalLine := lines[len(lines)-2] finalMsg := common.JsonOutputTemplate{} err := json.Unmarshal([]byte(finalLine), &finalMsg) if err != nil { return CopyOrSyncCommandResult{}, false } jobSummary := common.ListSyncJobSummaryResponse{} // this is a superset of ListJobSummaryResponse, so works for both copy and sync err = json.Unmarshal([]byte(finalMsg.MessageContent), &jobSummary) if err != nil { return CopyOrSyncCommandResult{}, false } return CopyOrSyncCommandResult{jobID: jobSummary.JobID, finalStatus: jobSummary}, true } func (c *CopyOrSyncCommandResult) GetTransferList(status common.TransferStatus, azcopyDir string) ([]common.TransferDetail, error) { runner := newTestRunner() runner.SetTransferStatusFlag(status.String()) // invoke AzCopy to get the status from the plan files result, err := runner.ExecuteJobsShowCommand(c.jobID, azcopyDir) if err != nil { return make([]common.TransferDetail, 0), err } return result.transfers, nil } type JobsShowCommandResult struct { jobID common.JobID transfers []common.TransferDetail } func newJobsShowCommandResult(rawOutput string) JobsShowCommandResult { lines := strings.Split(rawOutput, "\n") // parse out the final status // -2 because the last line is empty finalLine := lines[len(lines)-2] finalMsg := common.JsonOutputTemplate{} err := json.Unmarshal([]byte(finalLine), &finalMsg) if err != nil { panic(err) } listTransfersResponse := common.ListJobTransfersResponse{} err = json.Unmarshal([]byte(finalMsg.MessageContent), &listTransfersResponse) if err != nil { panic(err) } return JobsShowCommandResult{jobID: listTransfersResponse.JobID, transfers: listTransfersResponse.Details} }