e2etest/newe2e_runazcopy_stdout.go (235 lines of code) (raw):

package e2etest import ( "encoding/json" "github.com/Azure/azure-storage-azcopy/v10/cmd" "github.com/Azure/azure-storage-azcopy/v10/common" "strings" ) var _ AzCopyStdout = &AzCopyParsedStdout{} var _ AzCopyStdout = &AzCopyParsedListStdout{} var _ AzCopyStdout = &AzCopyParsedCopySyncRemoveStdout{} var _ AzCopyStdout = &AzCopyParsedDryrunStdout{} var _ AzCopyStdout = &AzCopyParsedJobsListStdout{} // ManySubscriberChannel is intended to reproduce the effects of .NET's events. // This allows us to *partially* answer the question of how we want to handle testing of prompting in the New E2E framework. // Assuming an async version of RunAzCopy eventually exists, one could hook into the relevant AzCopyParsedStdout-extending struct // and reply whenever a prompt is given. type ManySubscriberChannel[T any] struct { subscribers map[chan<- T]bool } func (m *ManySubscriberChannel[T]) Subscribe(target chan<- T) { if m.subscribers == nil { m.subscribers = make(map[chan<- T]bool) } m.subscribers[target] = true } func (m *ManySubscriberChannel[T]) SubscribeFunc(target func(T)) (handle chan<- T) { targetChannel := make(chan T) go func() { for { data, ok := <-targetChannel if !ok { break } target(data) } }() m.Subscribe(targetChannel) return targetChannel } // Unsubscribe will close the target channel, in addition to removing it from this channel's subscribers. func (m *ManySubscriberChannel[T]) Unsubscribe(target chan<- T) { if m.subscribers == nil { return } delete(m.subscribers, target) close(target) } func (m *ManySubscriberChannel[T]) Message(data T) { for k := range m.subscribers { k <- data } } // AzCopyParsedStdout is still a semi-raw stdout struct. type AzCopyParsedStdout struct { Messages []common.JsonOutputTemplate OnParsedLine ManySubscriberChannel[common.JsonOutputTemplate] } func (a *AzCopyParsedStdout) RawStdout() []string { out := make([]string, len(a.Messages)) for k, v := range a.Messages { buf, _ := json.Marshal(v) out[k] = string(buf) } return out } func (a *AzCopyParsedStdout) Write(p []byte) (n int, err error) { str := string(p) lines := strings.Split(strings.TrimSuffix(str, "\n"), "\n") n = len(p) for _, v := range lines { var out common.JsonOutputTemplate err = json.Unmarshal([]byte(v), &out) if err != nil { return } a.OnParsedLine.Message(out) a.Messages = append(a.Messages, out) } return } func (a *AzCopyParsedStdout) String() string { return strings.Join(a.RawStdout(), "\n") } type AzCopyParsedListStdout struct { AzCopyParsedStdout listenChan chan<- common.JsonOutputTemplate Items map[AzCopyOutputKey]cmd.AzCopyListObject Summary cmd.AzCopyListSummary } func (a *AzCopyParsedListStdout) InsertObject(obj cmd.AzCopyListObject) { if a.Items == nil { a.Items = make(map[AzCopyOutputKey]cmd.AzCopyListObject) } a.Items[AzCopyOutputKey{ Path: obj.Path, VersionId: obj.VersionId, }] = obj } func (a *AzCopyParsedListStdout) Write(p []byte) (n int, err error) { if a.listenChan == nil { a.listenChan = a.OnParsedLine.SubscribeFunc(func(line common.JsonOutputTemplate) { switch line.MessageType { case "ListObject": var object cmd.AzCopyListObject err = json.Unmarshal([]byte(line.MessageContent), &object) if err != nil { return } a.InsertObject(object) case "ListSummary": err = json.Unmarshal([]byte(line.MessageContent), &a.Summary) if err != nil { return } } }) } return a.AzCopyParsedStdout.Write(p) } type AzCopyParsedCopySyncRemoveStdout struct { AzCopyParsedStdout listenChan chan<- common.JsonOutputTemplate JobPlanFolder string LogFolder string InitMsg common.InitMsgJsonTemplate FinalStatus common.ListJobSummaryResponse } func (a *AzCopyParsedCopySyncRemoveStdout) Write(p []byte) (n int, err error) { if a.listenChan == nil { a.listenChan = a.OnParsedLine.SubscribeFunc(func(line common.JsonOutputTemplate) { switch line.MessageType { case common.EOutputMessageType.EndOfJob().String(): _ = json.Unmarshal([]byte(line.MessageContent), &a.FinalStatus) case common.EOutputMessageType.Init().String(): _ = json.Unmarshal([]byte(line.MessageContent), &a.InitMsg) } }) } return a.AzCopyParsedStdout.Write(p) } type AzCopyParsedDryrunStdout struct { AzCopyRawStdout fromTo common.FromTo // fallback for text output listenChan chan<- cmd.DryrunTransfer Transfers []cmd.DryrunTransfer Raw map[string]bool JsonMode bool } func (d *AzCopyParsedDryrunStdout) Write(p []byte) (n int, err error) { lines := strings.Split(string(p), "\n") for _, str := range lines { if !d.JsonMode && strings.HasPrefix(str, "DRYRUN: ") { if strings.HasPrefix(str, "DRYRUN: warn") { continue } d.Raw[str] = true } else { var out common.JsonOutputTemplate err = json.Unmarshal([]byte(str), &out) if err != nil { continue } if out.MessageType != common.EOutputMessageType.Dryrun().String() { continue } var tx cmd.DryrunTransfer err = json.Unmarshal([]byte(out.MessageContent), &tx) if err != nil { continue } d.Transfers = append(d.Transfers, tx) } } return d.AzCopyRawStdout.Write(p) } type AzCopyParsedJobsListStdout struct { AzCopyParsedStdout listenChan chan<- common.JsonOutputTemplate JobsCount int } func (a *AzCopyParsedJobsListStdout) Write(p []byte) (n int, err error) { if a.listenChan == nil { a.listenChan = a.OnParsedLine.SubscribeFunc(func(line common.JsonOutputTemplate) { if line.MessageType == common.EOutputMessageType.EndOfJob().String() { var tx common.ListJobsResponse err = json.Unmarshal([]byte(line.MessageContent), &tx) if err != nil { return } a.JobsCount = len(tx.JobIDDetails) } }) } return a.AzCopyParsedStdout.Write(p) } type AzCopyParsedLoginStatusStdout struct { AzCopyParsedStdout listenChan chan<- common.JsonOutputTemplate status cmd.LoginStatusOutput } func (a *AzCopyParsedLoginStatusStdout) Write(p []byte) (n int, err error) { if a.listenChan == nil { a.listenChan = a.OnParsedLine.SubscribeFunc(func(line common.JsonOutputTemplate) { if line.MessageType == common.EOutputMessageType.LoginStatusInfo().String() { out := &cmd.LoginStatusOutput{} err = json.Unmarshal([]byte(line.MessageContent), out) if err != nil { return } a.status = *out } }) } return a.AzCopyParsedStdout.Write(p) } var _ AzCopyStdout = &AzCopyInteractiveStdout{} // AzCopyInteractiveStdout is still a semi-raw stdout struct. type AzCopyInteractiveStdout struct { Messages []string asserter Asserter } // NewInteractiveWriter creates a new InteractiveWriter instance. func NewAzCopyInteractiveStdout(a Asserter) *AzCopyInteractiveStdout { return &AzCopyInteractiveStdout{ asserter: a, } } func (a *AzCopyInteractiveStdout) RawStdout() []string { return a.Messages } func (a *AzCopyInteractiveStdout) Write(p []byte) (n int, err error) { str := string(p) lines := strings.Split(strings.TrimSuffix(str, "\n"), "\n") n = len(p) for _, v := range lines { a.Messages = append(a.Messages, v) a.asserter.Log(v) } return } func (a *AzCopyInteractiveStdout) String() string { return strings.Join(a.RawStdout(), "\n") }