common/lifecyleMgr.go (524 lines of code) (raw):
package common
import (
"bufio"
"encoding/json"
"fmt"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
"unicode"
)
// only one instance of the formatter should exist
var lcm = func() (lcmgr *lifecycleMgr) {
lcmgr = &lifecycleMgr{
msgQueue: make(chan outputMessage, 1000),
progressCache: "",
cancelChannel: make(chan os.Signal, 1),
e2eContinueChannel: make(chan struct{}),
e2eAllowOpenChannel: make(chan struct{}),
outputFormat: EOutputFormat.Text(), // output text by default
logSanitizer: NewAzCopyLogSanitizer(),
inputQueue: make(chan userInput, 1000),
allowCancelFromStdIn: false,
allowWatchInput: false,
closeFunc: func() {}, // noop since we have nothing to do by default
waitForUserResponse: make(chan bool),
msgHandlerChannel: make(chan *LCMMsg),
}
// kick off the single routine that processes output
go lcmgr.processOutputMessage()
// and process input
go lcmgr.watchInputs()
// Check if need to do CPU profiling, and do CPU profiling accordingly when azcopy life start.
lcmgr.checkAndStartCPUProfiling()
return
}()
// create a public interface so that consumers outside of this package can refer to the lifecycle manager
// but they would not be able to instantiate one
type LifecycleMgr interface {
Init(OutputBuilder) // let the user know the job has started and initial information like log location
Progress(OutputBuilder) // print on the same line over and over again, not allowed to float up
Exit(OutputBuilder, ExitCode) // indicates successful execution exit after printing, allow user to specify exit code
Info(string) // simple print, allowed to float up
Warn(string) // simple print, allowed to float up
Dryrun(OutputBuilder) // print files for dry run mode
Output(OutputBuilder, OutputMessageType) // print output for list
Error(string) // indicates fatal error, exit after printing, exit code is always Failed (1)
Prompt(message string, details PromptDetails) ResponseOption // ask the user a question(after erasing the progress), then return the response
SurrenderControl() // give up control, this should never return
InitiateProgressReporting(WorkController) // start writing progress with another routine
AllowReinitiateProgressReporting() // allow re-initiation of progress reporting for followup job
SetOutputFormat(OutputFormat) // change the output format of the entire application
EnableInputWatcher() // depending on the command, we may allow user to give input through Stdin
EnableCancelFromStdIn() // allow user to send in `cancel` to stop the job
E2EAwaitContinue() // used by E2E tests
E2EAwaitAllowOpenFiles() // used by E2E tests
E2EEnableAwaitAllowOpenFiles(enable bool) // used by E2E tests
RegisterCloseFunc(func())
SetForceLogging()
IsForceLoggingDisabled() bool
MsgHandlerChannel() <-chan *LCMMsg
ReportAllJobPartsDone()
SetOutputVerbosity(mode OutputVerbosity)
}
func GetLifecycleMgr() LifecycleMgr {
return lcm
}
// single point of control for all outputs
type lifecycleMgr struct {
msgQueue chan outputMessage
progressCache string // useful for keeping job progress on the last line
cancelChannel chan os.Signal
doneChannel chan bool
e2eContinueChannel chan struct{}
e2eAllowOpenChannel chan struct{}
waitEverCalled int32
outputFormat OutputFormat
logSanitizer LogSanitizer
inputQueue chan userInput // msgs from the user
allowWatchInput bool // accept user inputs and place then in the inputQueue
allowCancelFromStdIn bool // allow user to send in 'cancel' from the stdin to stop the current job
e2eAllowAwaitContinue bool // allow the user to send 'continue' from stdin to start the current job
e2eAllowAwaitOpen bool // allow the user to send 'open' from stdin to allow the opening of the first file
closeFunc func() // used to close logs before exiting
disableSyslog bool
waitForUserResponse chan bool
msgHandlerChannel chan *LCMMsg
OutputVerbosityType OutputVerbosity
}
type userInput struct {
timeReceived time.Time
content string
}
// should be started in a single go routine
func (lcm *lifecycleMgr) watchInputs() {
consoleReader := bufio.NewReader(os.Stdin)
for {
// sleep for a bit, the option might be enabled later
if !lcm.allowWatchInput {
time.Sleep(time.Microsecond * 500)
continue
}
// reads input until the first occurrence of \n in the input,
input, err := consoleReader.ReadString('\n')
if err != nil {
continue
}
// remove spaces before/after the content
msg := strings.TrimSpace(input)
timeReceived := time.Now()
select {
case <-lcm.waitForUserResponse:
lcm.inputQueue <- userInput{timeReceived: timeReceived, content: msg}
continue
default:
}
allCharsAreWhiteSpace := true
for _, ch := range msg {
if !unicode.IsSpace(ch) {
allCharsAreWhiteSpace = false
break
}
}
if allCharsAreWhiteSpace {
continue
}
var req LCMMsgReq
if lcm.allowCancelFromStdIn && strings.EqualFold(msg, "cancel") {
lcm.cancelChannel <- os.Interrupt
} else if lcm.e2eAllowAwaitContinue && strings.EqualFold(msg, "continue") {
close(lcm.e2eContinueChannel)
} else if lcm.e2eAllowAwaitOpen && strings.EqualFold(msg, "open") {
close(lcm.e2eAllowOpenChannel)
} else if err := json.Unmarshal([]byte(msg), &req); err == nil { //json string
lcm.Info(fmt.Sprintf("Received request for %s with timeStamp %s", req.MsgType, req.TimeStamp.String()))
var msgType LCMMsgType
if err := msgType.Parse(req.MsgType); err != nil {
lcm.Info(fmt.Sprintf("Discarding incorrect message: %s.", req.MsgType))
continue
}
switch msgType {
case ELCMMsgType.CancelJob():
lcm.cancelChannel <- os.Interrupt
default:
m := NewLCMMsg()
m.Req = &req
lcm.msgHandlerChannel <- m
//wait till the message is completed
<-m.respChan
lcm.Response(*m.Resp)
}
} else {
lcm.Info("Discarding incorrectly formatted input message")
}
}
}
// get the answer to a question that was asked at a certain time
// only user input after the specified time is returned to make sure that we are getting the right answer to our question
// NOTE: to ask a question, go through Prompt, to guarantee that only 1 question is asked at a time
func (lcm *lifecycleMgr) getInputAfterTime(time time.Time) string {
for {
msg := <-lcm.inputQueue
// keep reading until we find an input that came in after the user specified time
if msg.timeReceived.After(time) {
return msg.content
}
// otherwise keep waiting as it's possible that the user has not typed it in yet
}
}
func (lcm *lifecycleMgr) EnableInputWatcher() {
lcm.allowWatchInput = true
}
func (lcm *lifecycleMgr) EnableCancelFromStdIn() {
lcm.allowCancelFromStdIn = true
}
func (lcm *lifecycleMgr) SetOutputFormat(format OutputFormat) {
lcm.outputFormat = format
}
func (lcm *lifecycleMgr) checkAndStartCPUProfiling() {
// CPU Profiling add-on. Set AZCOPY_PROFILE_CPU to enable CPU profiling,
// the value AZCOPY_PROFILE_CPU indicates the path to save CPU profiling data.
// e.g. export AZCOPY_PROFILE_CPU="cpu.prof"
// For more details, please refer to https://golang.org/pkg/runtime/pprof/
cpuProfilePath := GetEnvironmentVariable(EEnvironmentVariable.ProfileCPU())
if cpuProfilePath != "" {
lcm.Info(fmt.Sprintf("pprof start CPU profiling, and saving profiling data to: %q", cpuProfilePath))
f, err := os.Create(cpuProfilePath)
if err != nil {
lcm.Error(fmt.Sprintf("Fail to create file for CPU profiling, %v", err))
}
if err := pprof.StartCPUProfile(f); err != nil {
lcm.Error(fmt.Sprintf("Fail to start CPU profiling, %v", err))
}
}
}
func (lcm *lifecycleMgr) checkAndStopCPUProfiling() {
// Stop CPU profiling if there is ongoing CPU profiling.
pprof.StopCPUProfile()
}
func (lcm *lifecycleMgr) checkAndTriggerMemoryProfiling() {
// Memory Profiling add-on. Set AZCOPY_PROFILE_MEM to enable memory profiling,
// the value AZCOPY_PROFILE_MEM indicates the path to save memory profiling data.
// e.g. export AZCOPY_PROFILE_MEM="mem.prof"
// For more details, please refer to https://golang.org/pkg/runtime/pprof/
memProfilePath := GetEnvironmentVariable(EEnvironmentVariable.ProfileMemory())
if memProfilePath != "" {
lcm.Info(fmt.Sprintf("pprof start memory profiling, and saving profiling data to: %q", memProfilePath))
f, err := os.Create(memProfilePath)
if err != nil {
lcm.Error(fmt.Sprintf("Fail to create file for memory profiling, %v", err))
}
runtime.GC()
if err := pprof.WriteHeapProfile(f); err != nil {
lcm.Error(fmt.Sprintf("Fail to start memory profiling, %v", err))
}
if err := f.Close(); err != nil {
lcm.Info(fmt.Sprintf("Fail to close memory profiling file, %v", err))
}
}
}
func (lcm *lifecycleMgr) Init(o OutputBuilder) {
lcm.msgQueue <- outputMessage{
msgContent: o(lcm.outputFormat),
msgType: EOutputMessageType.Init(),
}
}
func (lcm *lifecycleMgr) Progress(o OutputBuilder) {
messageContent := ""
if o != nil {
messageContent = o(lcm.outputFormat)
}
lcm.msgQueue <- outputMessage{
msgContent: messageContent,
msgType: EOutputMessageType.Progress(),
}
}
func (lcm *lifecycleMgr) Info(msg string) {
msg = lcm.logSanitizer.SanitizeLogMessage(msg) // sometimes error-like text comes through Info, before the final "we've failed, please stop now" signal comes to Error. So we sanitize in both places.
infoMsg := fmt.Sprintf("INFO: %v", msg)
lcm.msgQueue <- outputMessage{
msgContent: infoMsg,
msgType: EOutputMessageType.Info(),
}
}
func (lcm *lifecycleMgr) Warn(msg string) {
msg = lcm.logSanitizer.SanitizeLogMessage(msg) // sometimes error-like text comes through Info, before the final "we've failed, please stop now" signal comes to Error. So we sanitize in both places.
infoMsg := fmt.Sprintf("WARN: %v", msg)
lcm.msgQueue <- outputMessage{
msgContent: infoMsg,
msgType: EOutputMessageType.Info(),
}
}
func (lcm *lifecycleMgr) Prompt(message string, details PromptDetails) ResponseOption {
expectedInputChannel := make(chan string, 1)
lcm.msgQueue <- outputMessage{
msgContent: message,
msgType: EOutputMessageType.Prompt(),
inputChannel: expectedInputChannel,
promptDetails: details,
}
// Request watchInputs() to wait for response from user
lcm.waitForUserResponse <- true
// block until input comes from the user
rawResponse := <-expectedInputChannel
// match the given response against one of the options we gave
for _, option := range details.ResponseOptions {
// in case the user misunderstood and typed full response type instead, we still tolerate it
// e.g. instead of "y", user typed "Yes"
if strings.EqualFold(option.ResponseString, rawResponse) ||
strings.EqualFold(option.UserFriendlyResponseType, rawResponse) {
return option
}
}
// nothing matched our options, assume default behavior (up to whoever that called Prompt)
// we don't re-prompt the user since this makes the integration with Stg Exp more complex
return EResponseOption.Default()
}
func (lcm *lifecycleMgr) Dryrun(o OutputBuilder) {
dryrunMessage := ""
if o != nil {
dryrunMessage = o(lcm.outputFormat)
}
lcm.msgQueue <- outputMessage{
msgContent: dryrunMessage,
msgType: EOutputMessageType.Dryrun(),
}
}
func (lcm *lifecycleMgr) Output(o OutputBuilder, msgType OutputMessageType) {
om := ""
if o != nil {
om = o(lcm.outputFormat)
}
lcm.msgQueue <- outputMessage{
msgContent: om,
msgType: msgType,
}
}
// TODO minor: consider merging with Exit
func (lcm *lifecycleMgr) Error(msg string) {
msg = lcm.logSanitizer.SanitizeLogMessage(msg)
// Check if need to do memory profiling, and do memory profiling accordingly before azcopy exits.
lcm.checkAndTriggerMemoryProfiling()
// Check if there is ongoing CPU profiling, and stop CPU profiling.
lcm.checkAndStopCPUProfiling()
lcm.msgQueue <- outputMessage{
msgContent: msg,
msgType: EOutputMessageType.Error(),
exitCode: EExitCode.Error(),
}
// stall forever until the success message is printed and program exits
lcm.SurrenderControl()
}
func (lcm *lifecycleMgr) Exit(o OutputBuilder, applicationExitCode ExitCode) {
if applicationExitCode != EExitCode.NoExit() {
// Check if need to do memory profiling, and do memory profiling accordingly before azcopy exits.
lcm.checkAndTriggerMemoryProfiling()
// Check if there is ongoing CPU profiling, and stop CPU profiling.
lcm.checkAndStopCPUProfiling()
}
messageContent := ""
if o != nil {
messageContent = o(lcm.outputFormat)
}
lcm.msgQueue <- outputMessage{
msgContent: messageContent,
msgType: EOutputMessageType.EndOfJob(),
exitCode: applicationExitCode,
}
if AzcopyCurrentJobLogger != nil && applicationExitCode != EExitCode.NoExit() {
AzcopyCurrentJobLogger.CloseLog()
}
if applicationExitCode != EExitCode.NoExit() {
// stall forever until the success message is printed and program exits
lcm.SurrenderControl()
}
}
func (lcm *lifecycleMgr) Response(resp LCMMsgResp) {
var respMsg string
if lcm.outputFormat == EOutputFormat.Json() {
m, err := json.Marshal(resp)
respMsg = string(m)
PanicIfErr(err)
} else {
respMsg = fmt.Sprintf("INFO: %v", resp.Value.String())
}
respMsg = lcm.logSanitizer.SanitizeLogMessage(respMsg)
lcm.msgQueue <- outputMessage{
msgContent: respMsg,
msgType: EOutputMessageType.Response(),
}
}
// this is used by commands that wish to stall forever to wait for the operations to complete
func (lcm *lifecycleMgr) SurrenderControl() {
// stall forever
select {}
}
func (lcm *lifecycleMgr) RegisterCloseFunc(closeFunc func()) {
if lcm.closeFunc != nil {
// "dereference" the function for later calling
orig := lcm.closeFunc
lcm.closeFunc = func() {
orig()
closeFunc()
}
} else {
lcm.closeFunc = closeFunc
}
}
func (lcm *lifecycleMgr) processOutputMessage() {
// this function constantly pulls out message to output
// and pass them onto the right handler based on the output format
for {
msgToPrint := <-lcm.msgQueue
if shouldQuietMessage(msgToPrint, lcm.OutputVerbosityType) {
lcm.processNoneOutput(msgToPrint)
continue
}
switch lcm.outputFormat {
case EOutputFormat.Json():
lcm.processJSONOutput(msgToPrint)
case EOutputFormat.Text():
lcm.processTextOutput(msgToPrint)
case EOutputFormat.None():
lcm.processNoneOutput(msgToPrint)
default:
panic("unimplemented output format")
}
}
}
func (lcm *lifecycleMgr) processNoneOutput(msgToOutput outputMessage) {
if msgToOutput.msgType == EOutputMessageType.Error() {
lcm.closeFunc()
os.Exit(int(EExitCode.Error()))
} else if msgToOutput.shouldExitProcess() {
lcm.closeFunc()
os.Exit(int(msgToOutput.exitCode))
}
// ignore all other outputs
}
func (lcm *lifecycleMgr) processJSONOutput(msgToOutput outputMessage) {
msgType := msgToOutput.msgType
questionTime := time.Now()
// simply output the json message
// we assume the msgContent is already formatted correctly
fmt.Println(GetJsonStringFromTemplate(newJsonOutputTemplate(msgType, msgToOutput.msgContent,
msgToOutput.promptDetails)))
// exit if needed
if msgToOutput.shouldExitProcess() {
lcm.closeFunc()
os.Exit(int(msgToOutput.exitCode))
} else if msgType == EOutputMessageType.Prompt() {
// read the response to the prompt and send it back through the channel
msgToOutput.inputChannel <- lcm.getInputAfterTime(questionTime)
}
}
func (lcm *lifecycleMgr) processTextOutput(msgToOutput outputMessage) {
// when a new line needs to overwrite the current line completely
// we need to make sure that if the new line is shorter, we properly erase everything from the current line
var matchLengthWithSpaces = func(curLineLength, newLineLength int) {
if dirtyLeftover := curLineLength - newLineLength; dirtyLeftover > 0 {
for i := 0; i < dirtyLeftover; i++ {
fmt.Print(" ")
}
}
}
switch msgToOutput.msgType {
case EOutputMessageType.Error(), EOutputMessageType.EndOfJob():
// simply print and quit
// if no message is intended, avoid adding new lines
if msgToOutput.msgContent != "" {
fmt.Println("\n" + msgToOutput.msgContent)
}
if msgToOutput.shouldExitProcess() {
lcm.closeFunc()
os.Exit(int(msgToOutput.exitCode))
}
case EOutputMessageType.Progress():
fmt.Print("\r") // return carriage back to start
fmt.Print(msgToOutput.msgContent) // print new progress
// it is possible that the new progress status is somehow shorter than the previous one
// in this case we must erase the left over characters from the previous progress
matchLengthWithSpaces(len(lcm.progressCache), len(msgToOutput.msgContent))
lcm.progressCache = msgToOutput.msgContent
case EOutputMessageType.Prompt():
questionTime := time.Now()
if lcm.progressCache != "" { // a progress status is already on the last line
// print the prompt from the beginning on current line
fmt.Print("\r")
fmt.Print(msgToOutput.msgContent)
// it is possible that the prompt is shorter than the progress status
// in this case we must erase the left over characters from the progress status
matchLengthWithSpaces(len(lcm.progressCache), len(msgToOutput.msgContent))
} else {
fmt.Print(msgToOutput.msgContent)
}
// example output: Please confirm with: [Y] Yes [N] No [A] Yes for all [L] No for all
fmt.Print(" Please confirm with:")
for _, option := range msgToOutput.promptDetails.ResponseOptions {
fmt.Printf(" [%s] %s ", strings.ToUpper(option.ResponseString), option.UserFriendlyResponseType)
}
// read the response to the prompt and send it back through the channel
msgToOutput.inputChannel <- lcm.getInputAfterTime(questionTime)
default:
// Init, Info, Dryrun, Response, ListSummary, ListObject, and any other new message types will use default
if lcm.progressCache != "" { // a progress status is already on the last line
// print the info from the beginning on current line
fmt.Print("\r")
fmt.Print(msgToOutput.msgContent)
// it is possible that the info is shorter than the progress status
// in this case we must erase the left over characters from the progress status
matchLengthWithSpaces(len(lcm.progressCache), len(msgToOutput.msgContent))
// print the previous progress status again, so that it's on the last line
fmt.Print("\n")
fmt.Print(lcm.progressCache)
} else {
fmt.Println(msgToOutput.msgContent)
}
}
}
// for the lifecycleMgr to babysit a job, it must be given a controller to get information about the job
type WorkController interface {
Cancel(mgr LifecycleMgr) // handle to cancel the work
ReportProgressOrExit(mgr LifecycleMgr) (totalKnownCount uint32) // print the progress status, optionally exit the application if work is done
}
// AllowReinitiateProgressReporting must be called before running an cleanup job, to allow the initiation of that job's
// progress reporting to begin
func (lcm *lifecycleMgr) AllowReinitiateProgressReporting() {
atomic.StoreInt32(&lcm.waitEverCalled, 0)
}
// isInteractive indicates whether the application was spawned by an actual user on the command
func (lcm *lifecycleMgr) InitiateProgressReporting(jc WorkController) {
if !atomic.CompareAndSwapInt32(&lcm.waitEverCalled, 0, 1) {
return
}
// this go routine never returns
// it will terminate the whole process eventually when the work is complete
go func() {
const progressFrequencyThreshold = 1000000
var oldCount, newCount uint32
wait := 2 * time.Second
lastFetchTime := time.Now().Add(-wait) // So that we start fetching time immediately
// cancelChannel will be notified when os receives os.Interrupt and os.Kill signals
signal.Notify(lcm.cancelChannel, os.Interrupt, syscall.SIGTERM)
cancelCalled := false
doCancel := func() {
cancelCalled = true
lcm.Info("Cancellation requested. Beginning clean shutdown...")
jc.Cancel(lcm)
}
for {
select {
case <-lcm.cancelChannel:
doCancel()
continue // to exit on next pass through loop
case <-lcm.doneChannel:
newCount = jc.ReportProgressOrExit(lcm)
lastFetchTime = time.Now()
case <-time.After(wait):
if time.Since(lastFetchTime) >= wait {
newCount = jc.ReportProgressOrExit(lcm)
lastFetchTime = time.Now()
}
}
if newCount >= progressFrequencyThreshold && !cancelCalled {
// report less on progress - to save on the CPU costs of doing so and because, if there are this many files,
// its going to be a long job anyway, so no need to report so often
wait = 2 * time.Minute
if oldCount < progressFrequencyThreshold {
lcm.Info(fmt.Sprintf("Reducing progress output frequency to %v, because there are over %d files", wait, progressFrequencyThreshold))
}
}
oldCount = newCount
}
}()
}
func (_ *lifecycleMgr) awaitChannel(ch chan struct{}, timeout time.Duration) {
select {
case <-ch:
case <-time.After(timeout):
}
}
// E2EAwaitContinue is used in case where a developer wants to debug AzCopy by attaching to the running process,
// before it starts doing any actual work.
func (lcm *lifecycleMgr) E2EAwaitContinue() {
lcm.e2eAllowAwaitContinue = true // not technically gorountine safe (since its shared state) but its consistent with EnableInputWatcher
lcm.EnableInputWatcher()
lcm.awaitChannel(lcm.e2eContinueChannel, time.Minute)
}
// E2EAwaitAllowOpenFiles is used in cases where we want to artificially produce a pause between enumeration and sending
// of the first file, for test purposes. (It only achieves that effect when the total file count is <= size of one job part).
// Does not pause at all, unless the feature has been enabled with a command-line flag.
func (lcm *lifecycleMgr) E2EAwaitAllowOpenFiles() {
lcm.awaitChannel(lcm.e2eAllowOpenChannel, 5*time.Minute)
}
func (lcm *lifecycleMgr) E2EEnableAwaitAllowOpenFiles(enable bool) {
if enable {
lcm.e2eAllowAwaitOpen = true // not technically gorountine safe (since its shared state) but its consistent with EnableInputWatcher
lcm.EnableInputWatcher()
} else {
close(lcm.e2eAllowOpenChannel) // so that E2EAwaitAllowOpenFiles will instantly return every time
}
}
// Fetching `AZCOPY_DISABLE_SYSLOG` from the environment variables and
// setting `disableSyslog` flag in LifeCycleManager to avoid Env Vars Lookup redundantly
func (lcm *lifecycleMgr) SetForceLogging() {
disableSyslog, err := strconv.ParseBool(GetEnvironmentVariable(EEnvironmentVariable.DisableSyslog()))
if err != nil {
// By default, we'll retain the current behaviour. i.e. To log in Syslog/WindowsEventLog if not specified by the user
disableSyslog = false
}
lcm.disableSyslog = disableSyslog
}
func (lcm *lifecycleMgr) IsForceLoggingDisabled() bool {
return lcm.disableSyslog
}
func (lcm *lifecycleMgr) MsgHandlerChannel() <-chan *LCMMsg {
return lcm.msgHandlerChannel
}
func (lcm *lifecycleMgr) ReportAllJobPartsDone() {
lcm.doneChannel <- true
}
func (lcm *lifecycleMgr) SetOutputVerbosity(mode OutputVerbosity) {
lcm.OutputVerbosityType = mode
}
// captures the common logic of exiting if there's an expected error
func PanicIfErr(err error) {
if err != nil {
panic(err)
}
}
func shouldQuietMessage(msgToOutput outputMessage, quietMode OutputVerbosity) bool {
messageType := msgToOutput.msgType
switch quietMode {
case EOutputVerbosity.Default():
return false
case EOutputVerbosity.Essential():
return messageType == EOutputMessageType.Progress() || messageType == EOutputMessageType.Info() || messageType == EOutputMessageType.Prompt()
case EOutputVerbosity.Quiet():
return true
default:
return false
}
}