agent/runcommand/coremodule.go (181 lines of code) (raw):
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package runcommand implements runcommand core processing module
package runcommand
import (
"fmt"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/aws/amazon-ssm-agent/agent/contracts"
"github.com/aws/amazon-ssm-agent/agent/framework/docmanager"
mdsService "github.com/aws/amazon-ssm-agent/agent/runcommand/mds"
"github.com/aws/amazon-ssm-agent/agent/sdkutil"
"github.com/aws/aws-sdk-go/service/ssmmds"
"github.com/carlescere/scheduler"
)
const (
documentContent = "DocumentContent"
runtimeConfig = "runtimeConfig"
cloudwatchPlugin = "aws:cloudWatch"
properties = "properties"
parameters = "Parameters"
// MDS service will mark document as timeout if it didn't recieve any responce from the agent after 2 hours
documentLevelTimeOutDurationHour = 2
)
var singletonMapOfUnsupportedSSMDocs map[string]bool
var once sync.Once
var loadDocStateFromSendCommand = parseSendCommandMessage
var loadDocStateFromCancelCommand = parseCancelCommandMessage
// ModuleName returns the module name
func (s *RunCommandService) ModuleName() string {
return s.name
}
// ModuleExecute starts the scheduling of the message processor plugin
func (s *RunCommandService) ModuleExecute() (err error) {
log := s.context.Log()
defer func() {
if msg := recover(); msg != nil {
log.Errorf("run command ModuleExecute run panic: %v", msg)
}
}()
log.Info("Starting document processing engine...")
var resultChan chan contracts.DocumentResult
if resultChan, err = s.processor.Start(); err != nil {
log.Errorf("unable to start document processor: %v", err)
return
}
go s.listenReply(resultChan)
if err = s.processor.InitialProcessing(true); err != nil {
log.Errorf("initial processing in EngineProcessor encountered error: %v", err)
return
}
log.Info("Scheduling message polling")
s.messagePollWaitGroup = &sync.WaitGroup{}
if s.messagePollJob, err = scheduler.Every(pollMessageFrequencyMinutes).Minutes().Run(s.messagePollLoop); err != nil {
s.context.Log().Errorf("unable to schedule message poll job. %v", err)
}
log.Info("Starting send replies to MDS")
if s.sendReplyJob, err = scheduler.Every(sendReplyFrequencyMinutes).Minutes().Run(s.sendReplyLoop); err != nil {
s.context.Log().Errorf("unable to schedule send reply job. %v", err)
}
return
}
func (s *RunCommandService) ModuleStop() (err error) {
//first stop sending failed replies to the service and the message poller
s.stop()
//second stop the message processor
s.processor.Stop()
return nil
}
func (s *RunCommandService) listenReply(resultChan chan contracts.DocumentResult) {
log := s.context.Log()
//processor guarantees to close this channel upon stop
for res := range resultChan {
func() {
defer func() {
if err := recover(); err != nil {
log.Errorf("Failed to process replies with error %v", err)
log.Errorf("Stracktrace:\n%s", debug.Stack())
}
}()
if res.LastPlugin != "" {
log.Infof("received plugin: %v result from Processor", res.LastPlugin)
} else {
log.Infof("command: %v complete", res.MessageID)
//Deleting Old Log Files after the execution is over and files have been moved to completed folder
//clean completed document state files and orchestration dirs. Takes care of only files generated by RunCommand in the folder
shortInstanceId, _ := s.context.Identity().ShortInstanceID()
go docmanager.DeleteOldOrchestrationDirectories(log,
shortInstanceId,
s.context.AppConfig().Agent.OrchestrationRootDir,
s.context.AppConfig().Ssm.RunCommandLogsRetentionDurationHours,
s.context.AppConfig().Ssm.AssociationLogsRetentionDurationHours)
}
s.sendResponse(res.MessageID, res)
}()
}
}
func (s *RunCommandService) processMessage(msg *ssmmds.Message) {
var (
docState *contracts.DocumentState
err error
)
// create separate logger that includes messageID with every log message
context := s.context.With("[messageID=" + *msg.MessageId + "]")
log := context.Log()
log.Debug("Processing message")
if err = validate(msg); err != nil {
log.Error("message not valid, ignoring: ", err)
return
}
if strings.HasPrefix(*msg.Topic, string(SendCommandTopicPrefix)) {
docState, err = loadDocStateFromSendCommand(context, msg, s.orchestrationRootDir)
if err != nil {
log.Error(err)
s.sendDocLevelResponse(*msg.MessageId, contracts.ResultStatusFailed, err.Error())
return
}
} else if strings.HasPrefix(*msg.Topic, string(CancelCommandTopicPrefix)) {
docState, err = loadDocStateFromCancelCommand(context, msg, s.orchestrationRootDir)
} else {
err = fmt.Errorf("unexpected topic name %v", *msg.Topic)
}
if err != nil {
log.Error("format of received message is invalid ", err)
if err = s.service.FailMessage(log, *msg.MessageId, mdsService.InternalHandlerException); err != nil {
sdkutil.HandleAwsError(log, err, s.processorStopPolicy)
}
return
}
if err = s.service.AcknowledgeMessage(log, *msg.MessageId); err != nil {
sdkutil.HandleAwsError(log, err, s.processorStopPolicy)
return
}
log.Debugf("Ack done. Received message - messageId - %v", *msg.MessageId)
log.Debugf("Processing to send a reply to update the document status to InProgress")
//TODO This function should be called in service when it submits the document to the engine
s.sendDocLevelResponse(*msg.MessageId, contracts.ResultStatusInProgress, "")
log.Debugf("SendReply done. Received message - messageId - %v", *msg.MessageId)
switch docState.DocumentType {
case contracts.SendCommandOffline, contracts.SendCommand:
s.processor.Submit(*docState)
case contracts.CancelCommandOffline, contracts.CancelCommand:
s.processor.Cancel(*docState)
default:
log.Error("unexpected document type ", docState.DocumentType)
}
}
// sendFailedReplies loads replies from local disk and send it again to the service, if it fails no action is needed
func (s *RunCommandService) sendFailedReplies() {
log := s.context.Log()
log.Debug("Checking if there are document replies that failed to reach the service, and retry sending them")
replies := s.service.LoadFailedReplies(log)
if len(replies) != 0 {
log.Infof("Found document replies that need to be sent to the service")
for _, reply := range replies {
log.Debug("Loading reply ", reply)
if isValidReplyRequest(reply) == false {
log.Debug("Reply is old, document execution must have timed out. Deleting the reply")
s.service.DeleteFailedReply(log, reply)
continue
}
sendReplyRequest, err := s.service.GetFailedReply(log, reply)
if err != nil {
log.Error("Couldn't load the reply from disk ", err)
continue
}
log.Info("Sending reply ", reply)
if err = s.service.SendReplyWithInput(log, sendReplyRequest); err != nil {
sdkutil.HandleAwsError(log, err, s.processorStopPolicy)
break
} else {
log.Infof("Sending reply %v succeeded, deleting the reply file from disk", reply)
s.service.DeleteFailedReply(log, reply)
}
}
} else {
log.Debugf("No failed document replies found")
}
}
// isValidReplyRequest checks if the sendReply request is older than 2 hours
// If so it is considered as not valid anymore as the document must have timed out
func isValidReplyRequest(filename string) bool {
splitFileName := strings.Split(filename, "_")
if len(splitFileName) < 2 {
return false
}
t, _ := time.Parse("2006-01-02T15-04-05", splitFileName[1])
curTime := time.Now().UTC()
delta := curTime.Sub(t).Hours()
if delta > documentLevelTimeOutDurationHour {
return false
} else {
return true
}
}