e2etest/newe2e_azcopy_environment_manager.go (219 lines of code) (raw):
package e2etest
import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
type AzCopyEnvironmentManagerKey struct{}
type AzCopyEnvironmentKey struct{}
type AzCopyRunNumKey struct{}
func FetchAzCopyEnvironmentContext(cm ScenarioAsserter) *AzCopyEnvironmentContext {
pCtx := cm.Context()
out := pCtx.Value(AzCopyEnvironmentManagerKey{})
if out == nil {
envCtx := &AzCopyEnvironmentContext{
ScenarioID: uuid.New(),
Environments: make([]*AzCopyEnvironment, 0),
mu: &sync.Mutex{},
cleanupOnce: &sync.Once{},
parent: pCtx,
}
cm.SetContext(envCtx)
return envCtx
}
return out.(*AzCopyEnvironmentContext)
}
type LogUpload struct {
EnvironmentID uint
RunID uint
Stdout string
Stderr string
}
type AzCopyEnvironmentContext struct {
ScenarioID uuid.UUID
Environments []*AzCopyEnvironment
LogUploads []LogUpload
// let's just code defensively and assume there will be a test case that will run azcopy instances in parallel
mu *sync.Mutex
cleanupOnce *sync.Once
parent context.Context
}
func (envCtx *AzCopyEnvironmentContext) Deadline() (deadline time.Time, ok bool) {
return envCtx.parent.Deadline()
}
func (envCtx *AzCopyEnvironmentContext) Done() <-chan struct{} {
return envCtx.parent.Done()
}
func (envCtx *AzCopyEnvironmentContext) Err() error {
return envCtx.parent.Err()
}
func (envCtx *AzCopyEnvironmentContext) Value(key any) any {
if key == (AzCopyEnvironmentManagerKey{}) {
return envCtx
}
return envCtx.parent.Value(key)
}
func (envCtx *AzCopyEnvironmentContext) GetEnvTempPath(env *AzCopyEnvironment) string {
return filepath.Join(os.TempDir(), envCtx.ScenarioID.String(), fmt.Sprintf("%03d", DerefOrZero(env.EnvironmentId)))
}
func (envCtx *AzCopyEnvironmentContext) GetEnvUploadPath(env *AzCopyEnvironment) string {
return filepath.Join(GlobalConfig.AzCopyExecutableConfig.LogDropPath, envCtx.ScenarioID.String(), fmt.Sprintf("%03d", DerefOrZero(env.EnvironmentId)))
}
const (
LogSubdir = "log"
PlanSubdir = "plan"
PprofSubdir = "pprof"
PprofMemFmt = "%03d.memory.pprof"
StdoutFmt = "%03d.stdout.txt"
StderrFmt = "%03d.stderr.txt"
)
// CreateEnvironment should be called in the event an AzCopyEnvironment wasn't specified, and should be presumed to be Run 0.
func (envCtx *AzCopyEnvironmentContext) CreateEnvironment() *AzCopyEnvironment {
envCtx.mu.Lock()
defer envCtx.mu.Unlock()
out := &AzCopyEnvironment{
ParentContext: envCtx,
EnvironmentId: pointerTo(uint(len(envCtx.Environments))),
RunCount: pointerTo[uint](1),
}
envCtx.Environments = append(envCtx.Environments, out)
return out
}
// RegisterEnvironment should be called if an AzCopyEnvironment is specified, and will no-op if the env already was registered.
func (envCtx *AzCopyEnvironmentContext) RegisterEnvironment(env *AzCopyEnvironment) (runId uint) {
envCtx.mu.Lock()
defer envCtx.mu.Unlock()
rc := DerefOrZero(env.RunCount)
env.RunCount = pointerTo(rc + 1)
if env.EnvironmentId != nil {
return rc
}
env.EnvironmentId = pointerTo(uint(len(envCtx.Environments)))
env.ParentContext = envCtx
return rc
}
func (envCtx *AzCopyEnvironmentContext) RegisterLogUpload(upload LogUpload) {
envCtx.mu.Lock()
envCtx.mu.Unlock()
envCtx.LogUploads = append(envCtx.LogUploads, upload)
}
func (envCtx *AzCopyEnvironmentContext) SetupCleanup(a ScenarioAsserter) {
envCtx.cleanupOnce.Do(func() {
a.Cleanup(func(a Asserter) {
envCtx.DoCleanup(a)
})
})
}
func (envCtx *AzCopyEnvironmentContext) DoCleanup(a Asserter) {
envCtx.mu.Lock()
defer envCtx.mu.Unlock()
// defer deletion of the temp dir
defer func() {
_ = os.RemoveAll(filepath.Join(os.TempDir(), envCtx.ScenarioID.String()))
}()
// Upload all of the files on the disk that are needed
for _, v := range envCtx.Environments {
v.DoCleanup(a)
}
if a.Failed() && GlobalConfig.AzCopyExecutableConfig.LogDropPath != "" {
for _, logUpload := range envCtx.LogUploads {
env := envCtx.Environments[logUpload.EnvironmentID]
envUploadDir := envCtx.GetEnvUploadPath(env)
if len(logUpload.Stdout) > 0 {
logFile, err := os.Create(filepath.Join(envUploadDir, fmt.Sprintf(StdoutFmt, logUpload.RunID)))
a.NoError(fmt.Sprintf("create stdout file "+StdoutFmt, logUpload.RunID), err)
if err == nil {
_, err = logFile.WriteString(logUpload.Stdout)
a.NoError(fmt.Sprintf("write stdout file "+StdoutFmt, logUpload.RunID), err)
}
}
if len(logUpload.Stderr) > 0 {
logFile, err := os.Create(filepath.Join(envUploadDir, fmt.Sprintf(StderrFmt, logUpload.RunID)))
a.NoError(fmt.Sprintf("create stderr file "+StderrFmt, logUpload.RunID), err)
if err == nil {
_, err = logFile.WriteString(logUpload.Stderr)
a.NoError(fmt.Sprintf("write stderr file "+StderrFmt, logUpload.RunID), err)
}
}
}
}
}
func (env *AzCopyEnvironment) DoCleanup(a Asserter) {
p := env.ParentContext
envPath := p.GetEnvTempPath(env)
// Upload the memory profiles even if we didn't fail
for pprofRun := range *env.RunCount {
memProfLoc := filepath.Join(
envPath,
PprofSubdir,
fmt.Sprintf(PprofMemFmt, pprofRun))
UploadMemoryProfile(a, memProfLoc, pprofRun)
}
// set up the log drop path
logDropPath := GlobalConfig.AzCopyExecutableConfig.LogDropPath
if !a.Failed() || logDropPath == "" {
return
}
logDropPath = env.ParentContext.GetEnvUploadPath(env)
// DRY
CopyDir := func(source, dest string) error {
err := os.MkdirAll(dest, os.ModePerm|os.ModeDir)
if err != nil && !errors.Is(err, os.ErrExist) {
return fmt.Errorf("failed to create dest directory: %w", err)
}
var errList []error
err = filepath.WalkDir(source, func(path string, d fs.DirEntry, err error) error {
relPath := strings.TrimPrefix(path, source)
if err != nil {
errList = append(errList, fmt.Errorf("failed to read %s: %w", relPath, err))
return nil
}
if d.IsDir() {
err = os.MkdirAll(filepath.Join(dest, relPath), os.ModePerm|os.ModeDir)
if err != nil {
errList = append(errList, fmt.Errorf("failed to create dir %s: %w", relPath, err))
}
return nil
}
srcFile, err := os.Open(path)
if err != nil {
errList = append(errList, fmt.Errorf("failed to open file %s: %w", relPath, err))
return nil
}
defer func() {
_ = srcFile.Close()
}()
destFile, err := os.Create(filepath.Join(dest, relPath))
if err != nil {
errList = append(errList, fmt.Errorf("failed to create dest file %s: %w", relPath, err))
return nil
}
defer func() {
_ = destFile.Close()
}()
_, err = io.Copy(destFile, srcFile)
if err != nil {
errList = append(errList, fmt.Errorf(""))
}
return nil
})
if errCt := len(errList); errCt == 0 {
return nil
} else if errCt == 1 {
return errList[0]
} else {
out := "Encountered multiple errors copying directory: "
for _, v := range errList {
out += "\n"
out += v.Error()
}
return errors.New(out)
}
}
// Upload logs
err := CopyDir(*env.LogLocation, filepath.Join(logDropPath, LogSubdir))
a.NoError("failed to copy logs", err)
// Upload plans
err = CopyDir(*env.JobPlanLocation, filepath.Join(logDropPath, PlanSubdir))
a.NoError("failed to copy plans", err)
uploadRelPath := strings.TrimPrefix(logDropPath, GlobalConfig.AzCopyExecutableConfig.LogDropPath)
a.Log("Uploaded logs for session to %s", uploadRelPath)
}