agent/framework/processor/processor.go (498 lines of code) (raw):
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package processor defines the document processing unit interface
package processor
import (
"fmt"
"os"
"path/filepath"
"runtime/debug"
"sync"
"time"
"github.com/aws/amazon-ssm-agent/agent/appconfig"
"github.com/aws/amazon-ssm-agent/agent/context"
"github.com/aws/amazon-ssm-agent/agent/contracts"
"github.com/aws/amazon-ssm-agent/agent/fileutil"
"github.com/aws/amazon-ssm-agent/agent/framework/docmanager"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer/outofproc"
"github.com/aws/amazon-ssm-agent/agent/log"
"github.com/aws/amazon-ssm-agent/agent/longrunning/manager"
"github.com/aws/amazon-ssm-agent/agent/rebooter"
"github.com/aws/amazon-ssm-agent/agent/task"
"github.com/aws/amazon-ssm-agent/agent/times"
)
type ExecuterCreator func(ctx context.T) executer.Executer
// ErrorCode represents processor related error codes
type ErrorCode string
const (
// hardstopTimeout is the time before the processor will be shutdown during a hardstop
hardStopTimeout = time.Second * 4
maxDocumentTimeOutHour = time.Hour * 48
// CommandBufferFull denotes that the cancel command buffer is full
CommandBufferFull ErrorCode = "CommandBufferFull"
// ClosedProcessor denotes that the processor is closed
ClosedProcessor ErrorCode = "ClosedProcessor"
// UnsupportedDocType represents unsupported doc type
UnsupportedDocType ErrorCode = "UnsupportedDocType"
// DuplicateCommand represents duplicate command in the buffer
DuplicateCommand ErrorCode = "DuplicateCommand"
// InvalidDocumentId represents invalid document id
InvalidDocumentId ErrorCode = "InvalidDocumentId"
// ConversionFailed represents conversion from pool to processor error code failed
ConversionFailed ErrorCode = "ConversionFailed"
// SubmissionPanic represents panic during submission to the pool
SubmissionPanic ErrorCode = "SubmissionPanic"
)
type Processor interface {
//Start activate the Processor and pick up the leftover document in the last run, it returns a channel to caller to gather DocumentResult
Start() (chan contracts.DocumentResult, error)
//InitialProcessing processes any initial documents loaded from file directory. This should be run after Start().
InitialProcessing(skipDocumentIfExpired bool) error
//Stop the processor, save the current state to resume later
Stop()
//Submit to the pool a document in form of docState object, results will be streamed back from the central channel returned by Start()
Submit(docState contracts.DocumentState) ErrorCode
//Cancel cancels processing of the given document
Cancel(docState contracts.DocumentState) ErrorCode
//TODO do we need to implement CancelAll?
//CancelAll()
}
// EngineProcessor defines methods to process the incoming document by pushing to the executor using JobPools
type EngineProcessor struct {
context context.T
executerCreator ExecuterCreator
sendCommandPool task.Pool
cancelCommandPool task.Pool
//TODO this should be abstract as the Processor's domain
resChan chan contracts.DocumentResult
documentMgr docmanager.DocumentMgr
stopFlagMutex sync.RWMutex
isProcessorStopped bool
startWorker *workerProcessorSpec
cancelWorker *workerProcessorSpec
poolToProcessorErrorCodeMap map[task.PoolErrorCode]ErrorCode
}
// WorkerProcessorSpec contains properties and methods to specify worker related specifications needed for the processor
type workerProcessorSpec struct {
workerLimit int
assignedDocType contracts.DocumentType
bufferLimit int
}
// GetAssignedDocType returns the assigned doc type
func (wps *workerProcessorSpec) GetAssignedDocType() contracts.DocumentType {
return wps.assignedDocType
}
// GetWorkerLimit returns the worker limit
func (wps *workerProcessorSpec) GetWorkerLimit() int {
return wps.workerLimit
}
// GetBufferLimit returns the worker buffer limit
func (wps *workerProcessorSpec) GetBufferLimit() int {
return wps.bufferLimit
}
// NewWorkerProcessorSpec return new worker processor specification object reference
func NewWorkerProcessorSpec(ctx context.T, workerLimit int, assignedDocType contracts.DocumentType, bufferLimit int) *workerProcessorSpec {
logger := ctx.Log()
workerProcessorSpecObj := &workerProcessorSpec{
workerLimit: workerLimit,
assignedDocType: assignedDocType,
bufferLimit: bufferLimit,
}
if workerLimit < 1 {
logger.Warnf("wrong worker limit format, assigning default value as 5")
workerProcessorSpecObj.workerLimit = 5
}
// 0 as buffer limit blocks the buffer channel
// 0 is passed by association module and offline processor module
if bufferLimit < 0 {
logger.Infof("wrong buffer limit format, assigning default value as 1")
workerProcessorSpecObj.bufferLimit = 1
}
if assignedDocType == "" {
logger.Debug("empty worker type assigned, assigning random doc type")
workerProcessorSpecObj.assignedDocType = "nodoctype" // dummy value
}
return workerProcessorSpecObj
}
// NewEngineProcessor returns the newly initiated EngineProcessor
// TODO worker pool should be triggered in the Start() function
// supported document types indicate the domain of the documents the Processor with run upon. There'll be race-conditions if there're multiple Processors in a certain domain.
func NewEngineProcessor(ctx context.T, startWorker *workerProcessorSpec, cancelWorker *workerProcessorSpec) *EngineProcessor {
engineProcessorCtx := ctx.With("[EngineProcessor]")
log := engineProcessorCtx.Log()
// sendCommand and cancelCommand will be processed by separate worker pools,
// so we can define the number of workers per each
cancelWaitDuration := 10000 * time.Millisecond
clock := times.DefaultClock
resChan := make(chan contracts.DocumentResult)
executerCreator := func(ctx context.T) executer.Executer {
return outofproc.NewOutOfProcExecuter(ctx)
}
documentMgr := docmanager.NewDocumentFileMgr(engineProcessorCtx, appconfig.DefaultDataStorePath, appconfig.DefaultDocumentRootDirName, appconfig.DefaultLocationOfState)
engineProcessor := &EngineProcessor{
context: engineProcessorCtx,
executerCreator: executerCreator,
resChan: resChan,
documentMgr: documentMgr,
sendCommandPool: task.NewPool(log, startWorker.workerLimit, startWorker.bufferLimit, cancelWaitDuration, clock),
cancelCommandPool: task.NewPool(log, cancelWorker.workerLimit, cancelWorker.bufferLimit, cancelWaitDuration, clock),
startWorker: startWorker,
cancelWorker: cancelWorker,
poolToProcessorErrorCodeMap: make(map[task.PoolErrorCode]ErrorCode),
}
engineProcessor.loadProcessorPoolErrorCodes()
return engineProcessor
}
func (p *EngineProcessor) Start() (resChan chan contracts.DocumentResult, err error) {
context := p.context
if context == nil {
return nil, fmt.Errorf("EngineProcessor is not initialized")
}
log := context.Log()
log.Debug("Starting")
resChan = p.resChan
return
}
func (p *EngineProcessor) InitialProcessing(skipDocumentIfExpired bool) (err error) {
context := p.context
if context == nil {
return fmt.Errorf("EngineProcessor is not initialized")
}
log := context.Log()
log.Debug("Initial processing")
// preloading pending files is added here to handle the below case:
// In-progress documents starts submission by pushing it to the pending state.
// This may lead to load same documents again when calling function processPendingDocuments
pendingFiles := p.getDocStateFiles(log, appconfig.DefaultLocationOfPending)
//prioritize the ongoing document first
p.processInProgressDocuments(skipDocumentIfExpired)
//deal with the pending jobs that have not picked up by worker yet
p.processPendingDocuments(pendingFiles)
return
}
// checkDocSubmissionAllowed checks whether the processor submission is allowed or not
func (p *EngineProcessor) checkDocSubmissionAllowed(docState *contracts.DocumentState, taskPool task.Pool, bufferLimit int) (error ErrorCode) {
logger := p.context.Log()
tokenSize := taskPool.BufferTokensIssued()
logger.Debugf("buffer limit start value for doc type %v with command id %v: tokenSize - %v bufferLimit - %v", docState.DocumentType, docState.DocumentInformation.DocumentID, tokenSize, bufferLimit)
if bufferLimit == 0 { // No synchronization needed as this value is loaded during processor initialization
return "" // No check needed when buffer limit is zero. sticking with old behavior
}
if p.hasProcessorStopped() { // additional check to drop it at the beginning itself
return ClosedProcessor
}
jobId := p.getJobId(docState)
errorCode := taskPool.AcquireBufferToken(jobId)
if errorCode != "" {
if processorErrorCode, ok := p.poolToProcessorErrorCodeMap[errorCode]; ok {
return processorErrorCode
} else {
return ConversionFailed
}
}
tokenSize = taskPool.BufferTokensIssued()
// Success condition
logger.Debugf("buffer limit end value for doc type %v with command id %v: tokenSize - %v bufferLimit - %v", docState.DocumentType, docState.DocumentInformation.DocumentID, tokenSize, bufferLimit)
return "" // Success
}
// loadProcessorPoolErrorCodes loads processor pool error code mappings
func (p *EngineProcessor) loadProcessorPoolErrorCodes() {
p.poolToProcessorErrorCodeMap[task.InvalidJobId] = InvalidDocumentId
p.poolToProcessorErrorCodeMap[task.DuplicateCommand] = DuplicateCommand
p.poolToProcessorErrorCodeMap[task.JobQueueFull] = CommandBufferFull
}
// cleanUpDocSubmissionOnError is used to clean-up initially acquired tokens
// call this function only after acquiring token successfully
func (p *EngineProcessor) cleanUpDocSubmissionOnError(doc *contracts.DocumentState) {
if doc.DocumentType == p.startWorker.assignedDocType && p.startWorker.bufferLimit > 0 { // do not call release token when buffer limit is zero
p.decrementCommandBuffer(doc, p.sendCommandPool)
} else if doc.DocumentType == p.cancelWorker.assignedDocType && p.cancelWorker.bufferLimit > 0 {
p.decrementCommandBuffer(doc, p.cancelCommandPool)
}
}
// decrementCommandBuffer used to delete start worker document from buffer
func (p *EngineProcessor) decrementCommandBuffer(doc *contracts.DocumentState, sendCommandPool task.Pool) {
logger := p.context.Log()
// safety check
if doc == nil {
logger.Errorf("document is nil")
return
}
jobId := p.getJobId(doc)
errorCode := sendCommandPool.ReleaseBufferToken(jobId)
tokenSize := sendCommandPool.BufferTokensIssued()
logger.Debugf("current buffer size for doc type %v with command id %v: tokenSize: %v", doc.DocumentType, jobId, tokenSize)
// should not happen at any time
if errorCode != "" {
logger.Warnf("clean up failed because of the following error code %v", errorCode)
return
}
logger.Infof("cleaned up command %v with doc type %v", jobId, doc.DocumentType)
}
// Submit submits to the pool a document in form of docState object, results will be streamed back from the channel returned by Start()
func (p *EngineProcessor) Submit(docState contracts.DocumentState) (errorCode ErrorCode) {
return p.submit(&docState, false)
}
// submit will send job to the sendCommandPool
func (p *EngineProcessor) submit(docState *contracts.DocumentState, isInProgressDocument bool) (errorCode ErrorCode) {
log := p.context.Log()
jobID := p.getJobId(docState)
// checks whether the document submission allowed in send command pool
// duplicate command check also happens here
// when buffer limit is zero, we return success("") always which means the pool submit will be blocking if it is full already
errorCode = p.checkProcessorSubmissionAllowed(docState)
if errorCode != "" {
return errorCode
}
log.Infof("document %v submission started", jobID)
defer log.Infof("document %v submission ended", jobID)
defer func() {
if r := recover(); r != nil {
errorCode = SubmissionPanic
p.cleanUpDocSubmissionOnError(docState) // call this function only after acquiring token successfully
log.Errorf("document %v submission panicked", jobID)
log.Errorf("stacktrace:\n%s", debug.Stack())
}
}()
if !isInProgressDocument {
p.documentMgr.PersistDocumentState(docState.DocumentInformation.DocumentID, appconfig.DefaultLocationOfPending, *docState)
}
//TODO this is a hack, in future jobID should be managed by Processing engine itself, instead of inferring from job's internal field
err := p.sendCommandPool.Submit(log, jobID, func(cancelFlag task.CancelFlag) {
processCommand(
p.context,
p.executerCreator,
cancelFlag,
p.resChan,
docState,
p.documentMgr)
})
if err != nil {
// currently, we have only Duplicate command error returned by the job pool
// * When buffer is zero, we don't have issues as we do not acquire/release token in pool
// * When buffer is > 0, we do acquire/release the token. In this case, the checkProcessorSubmissionAllowed would have been called at the beginning.
// Listing all possible combinations of states in Job pool for a document to discuss Duplicate command error:
// 1) When job is in the job queue buffer and not yet processed - This case is not possible as we do not receive commands already in "job queue buffer".
// 2) When job is released from job queue buffer and started processing - This case is also not possible as we do not receive commands already in "job store".
p.cleanUpDocSubmissionOnError(docState)
log.Error("Document Submission failed: ", err)
//move the fail-to-submit document to corrupt folder
p.documentMgr.MoveDocumentState(docState.DocumentInformation.DocumentID, appconfig.DefaultLocationOfPending, appconfig.DefaultLocationOfCorrupt)
return "" // considered submission successful even though it failed
}
return "" // considered submission successful
}
// checkProcessorSubmissionAllowed checks whether the processor submission is allowed or not
func (p *EngineProcessor) checkProcessorSubmissionAllowed(doc *contracts.DocumentState) (error ErrorCode) {
if doc.DocumentType == p.startWorker.assignedDocType {
return p.checkDocSubmissionAllowed(doc, p.sendCommandPool, p.startWorker.bufferLimit)
} else if doc.DocumentType == p.cancelWorker.assignedDocType {
return p.checkDocSubmissionAllowed(doc, p.cancelCommandPool, p.cancelWorker.bufferLimit)
}
return UnsupportedDocType
}
// getJobId returns job id
func (p *EngineProcessor) getJobId(docState *contracts.DocumentState) string {
var jobID string
if docState.IsAssociation() {
jobID = docState.DocumentInformation.AssociationID
} else {
jobID = docState.DocumentInformation.MessageID
}
return jobID
}
// Cancel pushes the command to CancelThread which is responsible for submitting to cancelCommandPool
func (p *EngineProcessor) Cancel(docState contracts.DocumentState) (errorCode ErrorCode) {
return p.cancel(&docState, false)
}
func (p *EngineProcessor) cancel(docState *contracts.DocumentState, isInProgressDocument bool) (errorCode ErrorCode) {
log := p.context.Log()
jobID := p.getJobId(docState)
log.Infof("document %v cancellation started", jobID)
defer log.Infof("document %v cancellation ended", jobID)
// checks whether the document submission allowed in cancel command pool
// duplicate command checks also happens here
// when buffer limit is zero, we return success("") which means the channel will be blocking if buffer is zero
errorCode = p.checkProcessorSubmissionAllowed(docState)
if errorCode != "" {
return errorCode
}
defer func() {
if r := recover(); r != nil {
errorCode = SubmissionPanic
p.cleanUpDocSubmissionOnError(docState) // call this function only after acquiring token successfully
log.Errorf("document %v submission panicked", jobID)
log.Errorf("stacktrace:\n%s", debug.Stack())
}
}()
if !isInProgressDocument {
p.documentMgr.PersistDocumentState(docState.DocumentInformation.DocumentID, appconfig.DefaultLocationOfPending, *docState)
}
err := p.cancelCommandPool.Submit(log, jobID, func(cancelFlag task.CancelFlag) {
processCancelCommand(p.context, p.sendCommandPool, docState, p.documentMgr)
})
if err != nil {
// currently, we have only Duplicate command error returned by the job pool
// * When buffer is zero, we don't have issues as we do not acquire/release token in pool
// * When buffer is > 0, we do acquire/release the token. In this case, the checkProcessorSubmissionAllowed would have been called at the beginning.
// Listing all possible combinations of states in Job pool for a document to discuss Duplicate command error:
// 1) When job is in the job queue buffer and not yet processed - This case is not possible as we do not receive commands already in "job queue buffer".
// 2) When job is released from job queue buffer and started processing - This case is also not possible as we do not receive commands already in "job store".
p.cleanUpDocSubmissionOnError(docState)
log.Error("CancelCommand failed", err)
}
return ""
}
// hasProcessorStopped checks whether the processor has stopped
func (p *EngineProcessor) hasProcessorStopped() bool {
p.stopFlagMutex.RLock() // change to RWMutex
defer p.stopFlagMutex.RUnlock()
return p.isProcessorStopped
}
// hasProcessorStoppedAlready returns whether the processor stop is called once or not
func (p *EngineProcessor) hasProcessorStopCalledAlready() bool {
p.stopFlagMutex.Lock()
defer p.stopFlagMutex.Unlock()
if p.isProcessorStopped {
return true
}
p.isProcessorStopped = true
return false
}
// Stop set the cancel flags of all the running jobs, which are to be captured by the command worker and shutdown gracefully
func (p *EngineProcessor) Stop() {
if p.hasProcessorStopCalledAlready() {
p.context.Log().Info("Processor stop called already")
return
}
waitTimeout := time.Duration(p.context.AppConfig().Mds.StopTimeoutMillis) * time.Millisecond
var wg sync.WaitGroup
// shutdown the send command pool in a separate go routine
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
p.context.Log().Errorf("Shutdown send command pool panic: %v", r)
p.context.Log().Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
defer wg.Done()
p.sendCommandPool.ShutdownAndWait(waitTimeout)
}()
// shutdown the cancel command pool in a separate go routine
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
p.context.Log().Errorf("Shutdown cancel command pool panic: %v", r)
p.context.Log().Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
defer wg.Done()
p.cancelCommandPool.ShutdownAndWait(waitTimeout)
}()
// wait for everything to shut down
wg.Wait()
// close the receiver channel only after we're sure all the ongoing jobs are stopped and no sender is on this channel
close(p.resChan)
p.context.Log().Info("processor closed")
}
// TODO remove the direct file dependency once we encapsulate docmanager package
func (p *EngineProcessor) processPendingDocuments(files []os.FileInfo) {
log := p.context.Log()
//iterate through all pending messages
for _, f := range files {
log.Infof("Found pending document - %v", f.Name())
//inspect document state
docState := p.documentMgr.GetDocumentState(f.Name(), appconfig.DefaultLocationOfPending)
if p.isSupportedDocumentType(docState.DocumentType) {
p.pushPersistedDocToJobPool(docState, appconfig.DefaultLocationOfPending, false)
}
}
}
func (p *EngineProcessor) getDocStateFiles(log log.T, docStateDir string) []os.FileInfo {
var files []os.FileInfo
instanceID, err := p.context.Identity().ShortInstanceID()
if err != nil {
log.Errorf("Failed to get short instanceID for process %v Documents: %v", docStateDir, err)
return files
}
// process older documents from state folder
docsLocation := docmanager.DocumentStateDir(instanceID, docStateDir)
if isDirectoryEmpty, _ := fileutil.IsDirEmpty(docsLocation); isDirectoryEmpty {
log.Debugf("No %v documents to process from %v", docStateDir, docsLocation)
return files
}
//get all messages
if files, err = fileutil.ReadDir(docsLocation); err != nil {
log.Errorf("skipping reading %v documents from %v. unexpected error encountered - %v", docStateDir, docsLocation, err)
}
return files
}
// ProcessInProgressDocuments processes InProgress documents that have already dequeued and entered job pool
func (p *EngineProcessor) processInProgressDocuments(skipDocumentIfExpired bool) {
log := p.context.Log()
config := p.context.AppConfig()
files := p.getDocStateFiles(log, appconfig.DefaultLocationOfCurrent)
//iterate through all InProgress docs
for _, f := range files {
log.Infof("Found in-progress document - %v", f.Name())
//inspect document state
docState := p.documentMgr.GetDocumentState(f.Name(), appconfig.DefaultLocationOfCurrent)
if p.isSupportedDocumentType(docState.DocumentType) {
retryLimit := config.Mds.CommandRetryLimit
if docState.DocumentInformation.RunCount >= retryLimit {
p.documentMgr.MoveDocumentState(f.Name(), appconfig.DefaultLocationOfCurrent, appconfig.DefaultLocationOfCorrupt)
continue
}
// increment the command run count
docState.DocumentInformation.RunCount++
p.documentMgr.PersistDocumentState(docState.DocumentInformation.DocumentID, appconfig.DefaultLocationOfCurrent, docState)
log.Infof("Processing in-progress document %v", docState.DocumentInformation.DocumentID)
if skipDocumentIfExpired && docState.DocumentInformation.CreatedDate != "" {
createDate := times.ParseIso8601UTC(docState.DocumentInformation.CreatedDate)
// Do not resume in-progress document is create date is 48 hours ago.
if createDate.Add(maxDocumentTimeOutHour).Before(time.Now().UTC()) {
log.Infof("Document %v expired %v, skipping", docState.DocumentInformation.DocumentID, docState.DocumentInformation.CreatedDate)
p.documentMgr.MoveDocumentState(f.Name(), appconfig.DefaultLocationOfCurrent, appconfig.DefaultLocationOfCorrupt)
continue
}
}
//Submit the work to Job Pool so that we don't block for processing of new messages
p.pushPersistedDocToJobPool(docState, appconfig.DefaultLocationOfCurrent, true)
}
}
}
// pushPersistedDocToJobPool pushes in-progress and pending documents to job pool during restart
func (p *EngineProcessor) pushPersistedDocToJobPool(docState contracts.DocumentState, docStateDir string, isInProgress bool) {
logger := p.context.Log()
// safety check
defer func() {
if r := recover(); r != nil {
p.cleanUpDocSubmissionOnError(&docState)
logger.Errorf("submitting to processor panicked %v %v", docState.DocumentInformation.DocumentID, r)
logger.Errorf("stacktrace:\n%s", debug.Stack())
}
}()
logger.Infof("Processing document %v from state dir %v", docState.DocumentInformation.DocumentID, docStateDir)
for {
var processorErrorCode ErrorCode
if docState.DocumentType == p.startWorker.assignedDocType {
processorErrorCode = p.submit(&docState, isInProgress)
} else if docState.DocumentType == p.cancelWorker.assignedDocType {
processorErrorCode = p.cancel(&docState, isInProgress)
}
if processorErrorCode == CommandBufferFull { // sleep only for command buffer full
logger.Debugf("pausing in-progress submission for a second %v because of error code %v", docState.DocumentInformation.DocumentID, processorErrorCode)
time.Sleep(time.Second)
continue
}
if processorErrorCode != "" { // all errors except CommandBufferFull
logger.Warnf("skipping in-progress document %v because of error code %v", docState.DocumentInformation.DocumentID, processorErrorCode)
}
break // break iteration for success and errors other than CommandBufferFull
}
}
// isSupportedDocumentType returns whether the processor supports the document
func (p *EngineProcessor) isSupportedDocumentType(documentType contracts.DocumentType) bool {
if documentType != "" {
if p.startWorker.assignedDocType == documentType || p.cancelWorker.assignedDocType == documentType {
return true
}
}
return false
}
func processCommand(context context.T, executerCreator ExecuterCreator, cancelFlag task.CancelFlag, resChan chan contracts.DocumentResult, docState *contracts.DocumentState, docMgr docmanager.DocumentMgr) {
log := context.Log()
//persist the current running document
docMgr.MoveDocumentState(
docState.DocumentInformation.DocumentID,
appconfig.DefaultLocationOfPending,
appconfig.DefaultLocationOfCurrent)
log.Debug("Running executer...")
documentID := docState.DocumentInformation.DocumentID
messageID := docState.DocumentInformation.MessageID
e := executerCreator(context)
docStore := executer.NewDocumentFileStore(documentID, appconfig.DefaultLocationOfCurrent, docState, docMgr, true)
statusChan := e.Run(
cancelFlag,
&docStore,
)
// Listen for reboot
var final *contracts.DocumentResult
for res := range statusChan {
func() {
defer func() {
if err := recover(); err != nil {
log.Errorf("Failed to process status for document %s with error %v", documentID, err)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
if res.LastPlugin == "" {
log.Infof("sending document: %v complete response", documentID)
} else {
log.Debugf("sending reply for plugin update: %v", res.LastPlugin)
}
final = &res
handleCloudwatchPlugin(context, res.PluginResults, documentID)
// when receiving the reply from workers, we do not have UpstreamServiceName populated
// whenever we receive a response, we populate with the appropriate Upstream service
// this is added to avoid changes in the workers
res.UpstreamServiceName = docState.UpstreamServiceName
// used to add topic to the payload in agent reply message in MGS interactor
res.RelatedDocumentType = docState.DocumentType
//hand off the message to Service
resChan <- res
log.Debugf("Process status for document %v done", documentID)
}()
}
//TODO add shutdown as API call, move cancelFlag out of task pool; cancelFlag to contracts, nobody else above runplugins needs to create cancelFlag.
// Shutdown/reboot detection
if final == nil || final.LastPlugin != "" {
log.Infof("document %v still in progress, shutting down...", messageID)
return
} else if final.Status == contracts.ResultStatusSuccessAndReboot {
log.Infof("document %v requested reboot, need to resume", messageID)
rebooter.RequestPendingReboot(context.Log())
return
}
//persist : commands execution in completed folder (terminal state folder)
log.Infof("execution of %v is over. Removing interimState from current folder", messageID)
docMgr.RemoveDocumentState(
documentID,
appconfig.DefaultLocationOfCurrent)
}
// TODO CancelCommand is currently treated as a special type of Command by the Processor, but in general Cancel operation should be seen as a probe to existing commands
func processCancelCommand(context context.T, sendCommandPool task.Pool, docState *contracts.DocumentState, docMgr docmanager.DocumentMgr) {
log := context.Log()
//persist the final status of cancel-message in current folder
docMgr.MoveDocumentState(
docState.DocumentInformation.DocumentID,
appconfig.DefaultLocationOfPending, appconfig.DefaultLocationOfCurrent)
log.Debugf("Canceling job with id %v...", docState.CancelInformation.CancelMessageID)
if found := sendCommandPool.Cancel(docState.CancelInformation.CancelMessageID); !found {
log.Debugf("Job with id %v not found (possibly completed)", docState.CancelInformation.CancelMessageID)
docState.CancelInformation.DebugInfo = fmt.Sprintf("Command %v couldn't be cancelled", docState.CancelInformation.CancelCommandID)
docState.DocumentInformation.DocumentStatus = contracts.ResultStatusFailed
} else {
docState.CancelInformation.DebugInfo = fmt.Sprintf("Command %v cancelled", docState.CancelInformation.CancelCommandID)
docState.DocumentInformation.DocumentStatus = contracts.ResultStatusSuccess
}
//persist : commands execution in completed folder (terminal state folder)
log.Debugf("Execution of %v is over. Removing interimState file from Current folder", docState.DocumentInformation.MessageID)
docMgr.RemoveDocumentState(
docState.DocumentInformation.DocumentID,
appconfig.DefaultLocationOfCurrent)
}
// TODO remove this once CloudWatch plugin is reworked
// temporary solution on plugins with shared responsibility with agent
func handleCloudwatchPlugin(context context.T, pluginResults map[string]*contracts.PluginResult, documentID string) {
log := context.Log()
instanceID, _ := context.Identity().InstanceID()
//TODO once association service switches to use RC and CW goes away, remove this block
for ID, pluginRes := range pluginResults {
if pluginRes.PluginName == appconfig.PluginNameCloudWatch {
log.Infof("Found %v to invoke lrpm invoker", pluginRes.PluginName)
orchestrationRootDir := filepath.Join(
appconfig.DefaultDataStorePath,
instanceID,
appconfig.DefaultDocumentRootDirName,
context.AppConfig().Agent.OrchestrationRootDir)
orchestrationDir := fileutil.BuildPath(orchestrationRootDir, documentID)
manager.Invoke(context, ID, pluginRes, orchestrationDir)
}
}
}