e2etest/newe2e_task_runazcopy.go (385 lines of code) (raw):

package e2etest import ( "bytes" "context" "fmt" "github.com/Azure/azure-storage-azcopy/v10/common" "io" "os" "os/exec" "path/filepath" "reflect" "strings" ) // AzCopyJobPlan todo probably load the job plan directly? WI#26418256 type AzCopyJobPlan struct{} type AzCopyStdout interface { RawStdout() []string io.Writer fmt.Stringer } type AzCopyDiscardStdout struct{} func (a *AzCopyDiscardStdout) RawStdout() []string { return []string{} } func (a *AzCopyDiscardStdout) Write(p []byte) (n int, err error) { fmt.Print(string(p)) // no-op return len(p), nil } func (a *AzCopyDiscardStdout) String() string { return "" } // AzCopyRawStdout shouldn't be used or relied upon right now! This will be fleshed out eventually. todo WI#26418258 type AzCopyRawStdout struct { RawOutput []string } func (a *AzCopyRawStdout) RawStdout() []string { return a.RawOutput } func (a *AzCopyRawStdout) Write(p []byte) (n int, err error) { str := string(p) lines := strings.Split(str, "\n") a.RawOutput = append(a.RawOutput, lines...) return len(p), nil } func (a *AzCopyRawStdout) String() string { return strings.Join(a.RawOutput, "\n") } var _ AzCopyStdout = &AzCopyRawStdout{} type AzCopyVerb string const ( // initially supporting a limited set of verbs AzCopyVerbCopy AzCopyVerb = "copy" AzCopyVerbSync AzCopyVerb = "sync" AzCopyVerbRemove AzCopyVerb = "remove" AzCopyVerbList AzCopyVerb = "list" AzCopyVerbLogin AzCopyVerb = "login" AzCopyVerbLoginStatus AzCopyVerb = "login status" AzCopyVerbLogout AzCopyVerb = "logout" AzCopyVerbJobsList AzCopyVerb = "jobs list" AzCopyVerbJobsResume AzCopyVerb = "jobs resume" AzCopyVerbJobsClean AzCopyVerb = "jobs clean" ) type AzCopyTarget struct { ResourceManager AuthType ExplicitCredentialTypes // Expects *one* credential type that the Resource supports. Assumes SAS (or GCP/S3) if not present. Opts CreateAzCopyTargetOptions // todo: SAS permissions // todo: specific OAuth types (e.g. MSI, etc.) } type CreateAzCopyTargetOptions struct { // SASTokenOptions expects a GenericSignatureValues, which can contain account signatures, or a service signature. SASTokenOptions GenericSignatureValues Scheme string // The wildcard string to append to the end of a resource URI. Wildcard string } func CreateAzCopyTarget(rm ResourceManager, authType ExplicitCredentialTypes, a Asserter, opts ...CreateAzCopyTargetOptions) AzCopyTarget { var validTypes ExplicitCredentialTypes if rrm, ok := rm.(RemoteResourceManager); ok { validTypes = rrm.ValidAuthTypes() } if validTypes != EExplicitCredentialType.None() { a.AssertNow(fmt.Sprintf("expected only one auth type, got %s", authType), Equal{}, authType.Count(), 1) a.AssertNow(fmt.Sprintf("expected authType to be contained within valid types (got %s, needed %s)", authType, validTypes), Equal{}, validTypes.Includes(authType), true) } else { a.AssertNow("Expected no auth types", Equal{}, authType, EExplicitCredentialType.None()) } return AzCopyTarget{rm, authType, FirstOrZero(opts)} } type AzCopyCommand struct { Verb AzCopyVerb PositionalArgs []string // Passing a ResourceManager assumes SAS (or GCP/S3) auth is intended. // Passing an AzCopyTarget will allow you to specify an exact credential type. // When OAuth, S3, GCP, AcctKey, etc. the appropriate env flags should auto-populate. Targets []ResourceManager Flags any // check SampleFlags Environment *AzCopyEnvironment // If Stdout is nil, a sensible default is picked in place. Stdout AzCopyStdout ShouldFail bool } type AzCopyEnvironment struct { // `env:"XYZ"` is reused but does not inherit the traits of config's env trait. Merely used for low-code mapping. LogLocation *string `env:"AZCOPY_LOG_LOCATION,defaultfunc:DefaultLogLoc"` JobPlanLocation *string `env:"AZCOPY_JOB_PLAN_LOCATION,defaultfunc:DefaultPlanLoc"` AutoLoginMode *string `env:"AZCOPY_AUTO_LOGIN_TYPE"` AutoLoginTenantID *string `env:"AZCOPY_TENANT_ID"` ServicePrincipalAppID *string `env:"AZCOPY_SPA_APPLICATION_ID"` ServicePrincipalClientSecret *string `env:"AZCOPY_SPA_CLIENT_SECRET"` AzureFederatedTokenFile *string `env:"AZURE_FEDERATED_TOKEN_FILE"` AzureTenantId *string `env:"AZURE_TENANT_ID"` AzureClientId *string `env:"AZURE_CLIENT_ID"` LoginCacheName *string `env:"AZCOPY_LOGIN_CACHE_NAME"` // InheritEnvironment is a lowercase list of environment variables to always inherit. // Specifying "*" as an entry with the value "true" will act as a wildcard, and inherit all env vars. InheritEnvironment map[string]bool `env:",defaultfunc:DefaultInheritEnvironment"` ManualLogin bool // These fields should almost never be intentionally set by a test writer unless the author really knows what they're doing, // as the fields are automatically controlled. ParentContext *AzCopyEnvironmentContext EnvironmentId *uint RunCount *uint } func (env *AzCopyEnvironment) InheritEnvVar(name string) { env.EnsureInheritEnvironment() env.InheritEnvironment[strings.ToLower(name)] = true } func (env *AzCopyEnvironment) EnsureInheritEnvironment() { if env.InheritEnvironment == nil { env.DefaultInheritEnvironment(nil, context.TODO()) // context isn't important in this default yet } } var RunAzCopyDefaultInheritEnvironment = map[string]bool{ "path": true, "home": true, "userprofile": true, "homepath": true, "homedrive": true, "azure_config_dir": true, } func (env *AzCopyEnvironment) DefaultInheritEnvironment(a ScenarioAsserter, ctx context.Context) map[string]bool { env.InheritEnvironment = RunAzCopyDefaultInheritEnvironment return env.InheritEnvironment } func (env *AzCopyEnvironment) generateAzcopyDir(a ScenarioAsserter, ctx context.Context) { envCtx := ctx.Value(AzCopyEnvironmentManagerKey{}).(*AzCopyEnvironmentContext) envTmpPath := envCtx.GetEnvTempPath(env) err := os.MkdirAll(envTmpPath, 0777) a.NoError("failed to create env dir ("+envTmpPath+")", err, true) env.LogLocation = pointerTo(filepath.Join(envTmpPath, LogSubdir)) env.JobPlanLocation = pointerTo(filepath.Join(envTmpPath, PlanSubdir)) } func (env *AzCopyEnvironment) DefaultLogLoc(a ScenarioAsserter, ctx context.Context) string { if env.JobPlanLocation != nil { env.LogLocation = env.JobPlanLocation } else if env.LogLocation == nil { env.generateAzcopyDir(a, ctx) } return *env.LogLocation } func (env *AzCopyEnvironment) DefaultPlanLoc(a ScenarioAsserter, ctx context.Context) string { if env.LogLocation != nil { env.JobPlanLocation = env.LogLocation } else if env.JobPlanLocation == nil { env.generateAzcopyDir(a, ctx) } return *env.JobPlanLocation } func (c *AzCopyCommand) applyTargetAuth(a Asserter, target ResourceManager) string { intendedAuthType := EExplicitCredentialType.SASToken() var opts GetURIOptions if tgt, ok := target.(AzCopyTarget); ok { count := tgt.AuthType.Count() a.AssertNow("target auth type must be single", Equal{}, count <= 1, true) if count == 1 { intendedAuthType = tgt.AuthType } opts.AzureOpts.SASValues = tgt.Opts.SASTokenOptions opts.RemoteOpts.Scheme = tgt.Opts.Scheme opts.Wildcard = tgt.Opts.Wildcard } else if target.Location() == common.ELocation.S3() { intendedAuthType = EExplicitCredentialType.S3() } else if target.Location() == common.ELocation.GCP() { intendedAuthType = EExplicitCredentialType.GCP() } switch intendedAuthType { case EExplicitCredentialType.PublicAuth(), EExplicitCredentialType.None(): return target.URI(opts) // no SAS, no nothing. case EExplicitCredentialType.SASToken(): opts.AzureOpts.WithSAS = true return target.URI(opts) case EExplicitCredentialType.OAuth(): // Only set it if it wasn't already configured. If it was manually configured, // special testing may be occurring, and this may be indicated to just get a SAS-less URI. // Alternatively, we may have already configured it here once before. if !c.Environment.ManualLogin { if c.Environment.AutoLoginMode == nil && c.Environment.ServicePrincipalAppID == nil && c.Environment.ServicePrincipalClientSecret == nil && c.Environment.AutoLoginTenantID == nil { if GlobalConfig.StaticResources() { staticOauth := GlobalConfig.E2EAuthConfig.StaticStgAcctInfo.StaticOAuth tenant := staticOauth.TenantID if useSPN, _, appId, secret := GlobalConfig.GetSPNOptions(); useSPN { c.Environment.AutoLoginMode = pointerTo("SPN") a.AssertNow("At least NEW_E2E_STATIC_APPLICATION_ID and NEW_E2E_STATIC_CLIENT_SECRET must be specified to use OAuth.", Empty{true}, appId, secret) c.Environment.ServicePrincipalAppID = &appId c.Environment.ServicePrincipalClientSecret = &secret c.Environment.AutoLoginTenantID = common.Iff(tenant != "", &tenant, nil) } else if staticOauth.OAuthSource.PSInherit { c.Environment.AutoLoginMode = pointerTo("pscred") c.Environment.AutoLoginTenantID = common.Iff(tenant != "", &tenant, nil) } else if staticOauth.OAuthSource.CLIInherit { c.Environment.AutoLoginMode = pointerTo("azcli") c.Environment.AutoLoginTenantID = common.Iff(tenant != "", &tenant, nil) } } else { // oauth should reliably work oAuthInfo := GlobalConfig.E2EAuthConfig.SubscriptionLoginInfo if oAuthInfo.Environment == AzurePipeline { // No need to force keep path, we already inherit that. c.Environment.InheritEnvVar(WorkloadIdentityToken) c.Environment.InheritEnvVar(WorkloadIdentityServicePrincipalID) c.Environment.InheritEnvVar(WorkloadIdentityTenantID) c.Environment.AutoLoginTenantID = common.Iff(oAuthInfo.DynamicOAuth.Workload.TenantId != "", &oAuthInfo.DynamicOAuth.Workload.TenantId, nil) c.Environment.AutoLoginMode = pointerTo(common.EAutoLoginType.AzCLI().String()) } else { c.Environment.AutoLoginMode = pointerTo(common.EAutoLoginType.SPN().String()) c.Environment.ServicePrincipalAppID = &oAuthInfo.DynamicOAuth.SPNSecret.ApplicationID c.Environment.ServicePrincipalClientSecret = &oAuthInfo.DynamicOAuth.SPNSecret.ClientSecret c.Environment.AutoLoginTenantID = common.Iff(oAuthInfo.DynamicOAuth.SPNSecret.TenantID != "", &oAuthInfo.DynamicOAuth.SPNSecret.TenantID, nil) } } } else if c.Environment.AutoLoginMode != nil { oAuthInfo := GlobalConfig.E2EAuthConfig.SubscriptionLoginInfo var mode common.AutoLoginType a.NoError("failed to parse auto login mode `"+*c.Environment.AutoLoginMode+"`", mode.Parse(*c.Environment.AutoLoginMode)) if mode == common.EAutoLoginType.Workload() { // Get the value of the AZURE_FEDERATED_TOKEN environment variable token := oAuthInfo.DynamicOAuth.Workload.FederatedToken a.AssertNow("idToken must be specified to authenticate with workload identity", Empty{Invert: true}, token) // Write the token to a temporary file // Create a temporary file to store the token file, err := os.CreateTemp("", "azure_federated_token.txt") a.AssertNow("Error creating temporary file", IsNil{}, err) defer file.Close() // Write the token to the temporary file _, err = file.WriteString(token) a.AssertNow("Error writing to temporary file", IsNil{}, err) // Set the AZURE_FEDERATED_TOKEN_FILE environment variable c.Environment.AzureFederatedTokenFile = pointerTo(file.Name()) c.Environment.AzureTenantId = pointerTo(oAuthInfo.DynamicOAuth.Workload.TenantId) c.Environment.AzureClientId = pointerTo(oAuthInfo.DynamicOAuth.Workload.ClientId) } } } return target.URI(opts) // Generate like public default: a.Error("unsupported credential type") return target.URI(opts) } } // RunAzCopy todo define more cleanly, implement func RunAzCopy(a ScenarioAsserter, commandSpec AzCopyCommand) (AzCopyStdout, *AzCopyJobPlan) { if a.Dryrun() { return nil, &AzCopyJobPlan{} } a.HelperMarker().Helper() var flagMap map[string]string var envMap map[string]string // we have no need to update our context manager, Fetch should do it for us. envCtx := FetchAzCopyEnvironmentContext(a) envCtx.SetupCleanup(a) // Make sure we add the cleanup hook; the sync.Once ensures idempotency. // register our environment, or create a new one if needed. var runNum uint if env := commandSpec.Environment; env == nil { commandSpec.Environment = envCtx.CreateEnvironment() } else { runNum = envCtx.RegisterEnvironment(env) } ctx := context.WithValue(envCtx, AzCopyRunNumKey{}, runNum) ctx = context.WithValue(ctx, AzCopyEnvironmentKey{}, commandSpec.Environment) // separate these from the struct so their execution order is fixed // Setup the positional args args := func() []string { if commandSpec.Environment == nil { commandSpec.Environment = &AzCopyEnvironment{} } out := []string{GlobalConfig.AzCopyExecutableConfig.ExecutablePath} out = append(out, strings.Split(string(commandSpec.Verb), " ")...) for _, v := range commandSpec.PositionalArgs { out = append(out, v) } for _, v := range commandSpec.Targets { out = append(out, commandSpec.applyTargetAuth(a, v)) } if commandSpec.Flags == nil { switch commandSpec.Verb { case AzCopyVerbCopy: commandSpec.Flags = CopyFlags{} case AzCopyVerbSync: commandSpec.Flags = SyncFlags{} case AzCopyVerbList: commandSpec.Flags = ListFlags{} case AzCopyVerbLogin: commandSpec.Flags = LoginFlags{} case AzCopyVerbLoginStatus: commandSpec.Flags = LoginStatusFlags{} case AzCopyVerbRemove: commandSpec.Flags = RemoveFlags{} default: commandSpec.Flags = GlobalFlags{} } } flagMap = MapFromTags(reflect.ValueOf(commandSpec.Flags), "flag", a, ctx) for k, v := range flagMap { out = append(out, fmt.Sprintf("--%s=%s", k, v)) } return out }() // Setup the env vars env := func() []string { out := make([]string, 0) envMap = MapFromTags(reflect.ValueOf(commandSpec.Environment), "env", a, ctx) for k, v := range envMap { out = append(out, fmt.Sprintf("%s=%s", k, v)) } if commandSpec.Environment.InheritEnvironment != nil { ieMap := commandSpec.Environment.InheritEnvironment if ieMap["*"] { out = append(out, os.Environ()...) } else { for _, v := range os.Environ() { key := v[:strings.Index(v, "=")] if ieMap[strings.ToLower(key)] { out = append(out, v) } } } } return out }() var out = commandSpec.Stdout if out == nil { // Select the correct stdoutput parser switch { // Dry-run parser case strings.EqualFold(flagMap["dry-run"], "true") && (strings.EqualFold(flagMap["output-type"], "json") || strings.EqualFold(flagMap["output-type"], "text") || flagMap["output-type"] == ""): // Dryrun has its own special sort of output, that supports non-json output. jsonMode := strings.EqualFold(flagMap["output-type"], "json") var fromTo common.FromTo if !jsonMode && len(commandSpec.Targets) >= 2 { fromTo = common.FromTo(commandSpec.Targets[0].Location())<<8 | common.FromTo(commandSpec.Targets[1].Location()) } out = &AzCopyParsedDryrunStdout{ JsonMode: jsonMode, fromTo: fromTo, Raw: make(map[string]bool), } // Text formats don't get parsed usually case !strings.EqualFold(flagMap["output-type"], "json"): out = &AzCopyRawStdout{} // Copy/sync/remove share the same output format case commandSpec.Verb == AzCopyVerbCopy || commandSpec.Verb == AzCopyVerbSync || commandSpec.Verb == AzCopyVerbRemove: out = &AzCopyParsedCopySyncRemoveStdout{ JobPlanFolder: *commandSpec.Environment.JobPlanLocation, LogFolder: *commandSpec.Environment.LogLocation, } // List case commandSpec.Verb == AzCopyVerbList: out = &AzCopyParsedListStdout{} // Jobs list case commandSpec.Verb == AzCopyVerbJobsList: out = &AzCopyParsedJobsListStdout{} // Jobs resume case commandSpec.Verb == AzCopyVerbJobsResume: out = &AzCopyParsedCopySyncRemoveStdout{ // Resume command treated the same as copy/sync/remove JobPlanFolder: *commandSpec.Environment.JobPlanLocation, LogFolder: *commandSpec.Environment.LogLocation, } // Login status case commandSpec.Verb == AzCopyVerbLoginStatus: out = &AzCopyParsedLoginStatusStdout{} // Login (interactive) case commandSpec.Verb == AzCopyVerbLogin: var lType common.AutoLoginType if ltStr := flagMap["login-type"]; ltStr != "" { _ = lType.Parse(ltStr) } if lType.IsInteractive() { out = NewAzCopyInteractiveStdout(a) break } fallthrough default: // We don't know how to parse this. out = &AzCopyRawStdout{} } } stderr := &bytes.Buffer{} command := exec.Cmd{ Path: GlobalConfig.AzCopyExecutableConfig.ExecutablePath, Args: args, Env: env, Stdout: out, // todo Stderr: stderr, } in, err := command.StdinPipe() a.NoError("get stdin pipe", err) err = command.Start() a.Assert("run command", IsNil{}, err) if isLaunchedByDebugger { beginAzCopyDebugging(in) } err = command.Wait() a.Assert("wait for finalize", common.Iff[Assertion](commandSpec.ShouldFail, Not{IsNil{}}, IsNil{}), err) a.Assert("expected exit code", common.Iff[Assertion](commandSpec.ShouldFail, Not{Equal{}}, Equal{}), 0, command.ProcessState.ExitCode()) // validate log file retention for jobs clean command before the job logs are cleaned up and uploaded if !a.Failed() && commandSpec.Verb == AzCopyVerbJobsClean { ValidateLogFileRetention(a, *commandSpec.Environment.LogLocation, 1) } // The environment manager will handle cleanup for us-- All we need to do at this point is register our stdout. envCtx.RegisterLogUpload(LogUpload{ EnvironmentID: *commandSpec.Environment.EnvironmentId, RunID: runNum, Stdout: out.String(), Stderr: stderr.String(), }) return out, &AzCopyJobPlan{} }