agent/framework/processor/executer/outofproc/master.go (210 lines of code) (raw):
package outofproc
import (
"fmt"
"runtime/debug"
"time"
"github.com/aws/amazon-ssm-agent/agent/appconfig"
"github.com/aws/amazon-ssm-agent/agent/context"
"github.com/aws/amazon-ssm-agent/agent/contracts"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer/basicexecuter"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer/outofproc/messaging"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer/outofproc/proc"
"github.com/aws/amazon-ssm-agent/agent/log"
logpkg "github.com/aws/amazon-ssm-agent/agent/log/logger"
"github.com/aws/amazon-ssm-agent/agent/task"
"github.com/aws/amazon-ssm-agent/common/filewatcherbasedipc"
"github.com/aws/amazon-ssm-agent/common/identity"
"github.com/aws/amazon-ssm-agent/core/executor"
)
type Backend messaging.MessagingBackend
// see differences between zombie and orphan: https://www.gmarik.info/blog/2012/orphan-vs-zombie-vs-daemon-processes/
const (
// Give ample time for messaging block to successfully finish processing by itself.
// Before sending the force shutdown signal in case it does not correctly exit.
defaultZombieProcessTimeout = 60 * time.Second
// If a process is cancelled, we give additional time for the cancellation process
// to finish itself, and then send the force shutdown signal.
defaultCancelledProcessTimeout = 300 * time.Second
// command maximum timeout
defaultOrphanProcessTimeout = 172800 * time.Second
)
type OutOfProcExecuter struct {
basicexecuter.BasicExecuter
docState *contracts.DocumentState
ctx context.T
cancelFlag task.CancelFlag
executor executor.IExecutor
}
var channelCreator = func(log log.T, identity identity.IAgentIdentity, mode filewatcherbasedipc.Mode, documentID string) (filewatcherbasedipc.IPCChannel, error, bool) {
return filewatcherbasedipc.CreateFileWatcherChannel(log, identity, mode, documentID, false)
}
var processFinder = func(log log.T, procinfo contracts.OSProcInfo, executor executor.IExecutor) bool {
//If ProcInfo is not initailized
//pid 0 is reserved for kernel on both linux and windows, so the assumption is safe here
if procinfo.Pid == 0 {
return false
}
isRunning, err := executor.IsPidRunning(procinfo.Pid)
if err != nil {
log.Errorf("Failed to query for running process: %v", err)
return false
}
return isRunning
}
var processCreator = func(name string, argv []string) (proc.OSProcess, error) {
return proc.StartProcess(name, argv)
}
func NewOutOfProcExecuter(ctx context.T) *OutOfProcExecuter {
newContext := ctx.With("[OutOfProcExecuter]")
return &OutOfProcExecuter{
BasicExecuter: *basicexecuter.NewBasicExecuter(ctx),
ctx: newContext,
executor: executor.NewProcessExecutor(newContext.Log()),
}
}
// Run() prepare the ipc channel, create a data processing backend and start messaging with docment worker
func (e *OutOfProcExecuter) Run(
cancelFlag task.CancelFlag,
docStore executer.DocumentStore) chan contracts.DocumentResult {
docState := docStore.Load()
e.docState = &docState
e.cancelFlag = cancelFlag
documentID := docState.DocumentInformation.DocumentID
//update context with the document id
e.ctx = e.ctx.With("[" + documentID + "]")
log := e.ctx.Log()
//stopTimer signals messaging routine to stop, it's buffered because it needs to exit if messaging is already stopped and not receiving anymore
stopTimer := make(chan bool, 1)
//start prepare messaging
//if anything fails during the prep stage, use in-proc Runner
ipc, err := e.initialize(stopTimer)
//save doc store immediately in case agent restarts.
docStore.Save(*e.docState)
if err != nil {
log.Errorf("failed to prepare outofproc executer, falling back to InProc Executer")
log.WriteEvent(logpkg.AgentTelemetryMessage, "", logpkg.AmazonAgentInProcExecuterStartEvent)
return e.BasicExecuter.Run(cancelFlag, docStore)
} else {
//create reply channel
resChan := make(chan contracts.DocumentResult, len(e.docState.InstancePluginsInformation)+1)
//launch the messaging go-routine
go func(store executer.DocumentStore) {
defer func() {
if msg := recover(); msg != nil {
log.Errorf("Executer go-routine panic: %v", msg)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
//save the overall result and signal called that Executer is done
store.Save(*e.docState)
log.Debug("Executer closed")
close(resChan)
}()
e.messaging(log, ipc, resChan, cancelFlag, stopTimer)
}(docStore)
return resChan
}
}
// Executer spins up an ipc transmission worker, it creates a Data processing backend and hands off the backend to the ipc worker
// ipc worker and data backend act as 2 threads exchange raw json messages, and messaging protocol happened in data backend, data backend is self-contained and exit when command finishes accordingly
// Executer however does hold a timer to the worker to forcefully termniate both of them
func (e *OutOfProcExecuter) messaging(log log.T, ipc filewatcherbasedipc.IPCChannel, resChan chan contracts.DocumentResult, cancelFlag task.CancelFlag, stopTimer chan bool) {
// backup current time in case outofproc execution failed and cannot correctly return PluginResult
backupStartTime := time.Now()
//handoff reply functionalities to data backend.
backend := messaging.NewExecuterBackend(log, resChan, e.docState, cancelFlag)
//handoff the data backend to messaging worker
if err := messaging.Messaging(log, ipc, backend, stopTimer); err != nil {
//the messaging worker encountered error, either ipc run into error or data backend throws error
log.Errorf("messaging worker encountered error: %v", err)
log.Errorf("document state during messaging worker error: %v", e.docState.DocumentInformation.DocumentStatus)
if e.docState.DocumentInformation.DocumentStatus == contracts.ResultStatusInProgress ||
e.docState.DocumentInformation.DocumentStatus == "" ||
e.docState.DocumentInformation.DocumentStatus == contracts.ResultStatusNotStarted ||
e.docState.DocumentInformation.DocumentStatus == contracts.ResultStatusSuccessAndReboot {
e.docState.DocumentInformation.DocumentStatus = contracts.ResultStatusFailed
log.Info("document failed half way, sending fail message...")
errMsg := fmt.Sprintf("document process failed unexpectedly: %s , check [ssm-document-worker]/[ssm-session-worker] log for crash reason", err)
resChan <- e.generateUnexpectedFailResult(errMsg, backupStartTime)
}
//destroy the channel
ipc.Destroy()
}
}
func (e *OutOfProcExecuter) generateUnexpectedFailResult(errMsg string, startTime time.Time) contracts.DocumentResult {
endTime := time.Now()
var docResult contracts.DocumentResult
docResult.MessageID = e.docState.DocumentInformation.MessageID
docResult.AssociationID = e.docState.DocumentInformation.AssociationID
docResult.DocumentName = e.docState.DocumentInformation.DocumentName
docResult.NPlugins = len(e.docState.InstancePluginsInformation)
docResult.DocumentVersion = e.docState.DocumentInformation.DocumentVersion
docResult.Status = contracts.ResultStatusFailed
docResult.PluginResults = make(map[string]*contracts.PluginResult)
res := e.docState.InstancePluginsInformation[0].Result
res.Output = errMsg
res.Status = contracts.ResultStatusFailed
res.StartDateTime = startTime
res.EndDateTime = endTime
docResult.PluginResults[e.docState.InstancePluginsInformation[0].Id] = &res
return docResult
}
// prepare the channel for messaging as well as launching the document worker process, if the channel already exists, re-open it.
// launch timeout timer based off the discovered process status
func (e *OutOfProcExecuter) initialize(stopTimer chan bool) (ipc filewatcherbasedipc.IPCChannel, err error) {
log := e.ctx.Log()
var found bool
documentID := e.docState.DocumentInformation.DocumentID
// this fix is added to delete channels which were not deleted in previous execution
if e.docState.DocumentInformation.DocumentStatus == contracts.ResultStatusSuccessAndReboot {
log.Info("deleting channel for reboot document")
if channelErr := filewatcherbasedipc.RemoveFileWatcherChannel(e.ctx.Identity(), documentID); channelErr != nil {
log.Warnf("failed to remove channel directory: %v", channelErr)
}
// change document status to in progress now instead of waiting for worker to avoid
// the chance agent is restarted before worker updates, leaving status as SuccessAndReboot,
// and resulting in agent deleting an in use file channel when it comes back up
e.docState.DocumentInformation.DocumentStatus = contracts.ResultStatusInProgress
}
ipc, err, found = channelCreator(log, e.ctx.Identity(), filewatcherbasedipc.ModeMaster, documentID)
if err != nil {
log.Errorf("failed to create ipc channel: %v", err)
return
}
if found {
log.Info("discovered old channel object, trying to find detached process...")
var stopTime time.Duration
procInfo := e.docState.DocumentInformation.ProcInfo
if processFinder(log, procInfo, e.executor) {
log.Infof("found orphan process: %v, start time: %v", procInfo.Pid, procInfo.StartTime)
stopTime = defaultOrphanProcessTimeout
} else {
log.Infof("process: %v not found, treat as exited", procInfo.Pid)
stopTime = defaultZombieProcessTimeout
}
go timeoutOrCancel(stopTimer, stopTime, e.cancelFlag)
} else {
log.Debug("channel not found, starting a new process...")
var workerName string
if e.docState.DocumentType == contracts.StartSession {
workerName = appconfig.DefaultSessionWorker
} else {
workerName = appconfig.DefaultDocumentWorker
}
var process proc.OSProcess
if process, err = processCreator(workerName, []string{documentID}); err != nil {
log.Errorf("start process: %v error: %v", workerName, err)
//make sure close the channel
ipc.Destroy()
return
} else {
log.Debugf("successfully launched new process: %v", process.Pid())
}
e.docState.DocumentInformation.ProcInfo = contracts.OSProcInfo{
Pid: process.Pid(),
StartTime: process.StartTime(),
}
//TODO add command timeout as well, in case process get stuck
go e.WaitForProcess(stopTimer, process)
}
return
}
func (e *OutOfProcExecuter) WaitForProcess(stopTimer chan bool, process proc.OSProcess) {
log := e.ctx.Log()
//TODO revisit this feature, it has done sides of killing the document worker too fast -- the worker might busy doing s3 upload
//waitReturned := false
//go func() {
// //if job complete but process still hangs, kill it.
// e.cancelFlag.Wait()
// if !waitReturned && e.cancelFlag.State() == task.Completed {
// //do not kill it immediately, should be grace-period to allow s3 upload to finish
// <-time.After(defaultZombieProcessTimeout)
// log.Info("killing process...")
// process.Kill()
// }
//}()
defer func() {
if r := recover(); r != nil {
log.Errorf("Wait for process panic: %v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
if err := process.Wait(); err != nil {
log.Errorf("process: %v exited unsuccessfully, error message: %v", process.Pid(), err)
} else {
log.Debugf("process: %v exited successfully, trying to stop messaging worker", process.Pid())
}
//waitReturned = true
timeout(stopTimer, defaultZombieProcessTimeout)
}
func timeoutOrCancel(stopTimer chan bool, duration time.Duration, cancelFlag task.CancelFlag) {
stopChan := make(chan bool, 1)
//TODO refactor cancelFlag.Wait() to return channel instead of blocking call
go func() {
cancelFlag.Wait()
stopChan <- true
}()
select {
case <-time.After(duration):
stopTimer <- true
case <-stopChan:
go timeout(stopTimer, defaultCancelledProcessTimeout)
}
}
func timeout(stopTimer chan bool, duration time.Duration) {
<-time.After(duration)
stopTimer <- true
}