agent/framework/coremanager/coremanager.go (225 lines of code) (raw):
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package coremanager encapsulates the logic for configuring, starting and stopping core modules
package coremanager
import (
"path/filepath"
"runtime/debug"
"sync"
"time"
"github.com/aws/amazon-ssm-agent/agent/agentlogstocloudwatch/cloudwatchlogspublisher"
"github.com/aws/amazon-ssm-agent/agent/appconfig"
"github.com/aws/amazon-ssm-agent/agent/context"
"github.com/aws/amazon-ssm-agent/agent/fileutil"
"github.com/aws/amazon-ssm-agent/agent/framework/coremodules"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer/plugin"
"github.com/aws/amazon-ssm-agent/agent/framework/runpluginutil"
logger "github.com/aws/amazon-ssm-agent/agent/log"
"github.com/aws/amazon-ssm-agent/agent/rebooter"
)
const (
softStopTimeout = time.Second * 15
hardStopTimeout = time.Second * 5
)
type ICoreManager interface {
// Start executes the registered core modules
Start()
// Stop requests the core modules to stop executing
Stop()
}
// CoreManager encapsulates the logic for configuring, starting and stopping core modules
type CoreManager struct {
context context.T
coreModules coremodules.ModuleRegistry
cloudwatchPublisher *cloudwatchlogspublisher.CloudWatchPublisher
rebooter rebooter.IRebootType
}
// NewCoreManager creates a new core module manager.
func NewCoreManager(context context.T, mr coremodules.ModuleRegistry, cwp *cloudwatchlogspublisher.CloudWatchPublisher, rbt rebooter.IRebootType) (cm *CoreManager, err error) {
log := context.Log()
shortInstanceId, err := context.Identity().ShortInstanceID()
if err != nil {
log.Errorf("error fetching the ShortInstanceID: %v", err)
return nil, err
}
if err = fileutil.HardenDataFolder(log); err != nil {
log.Errorf("error initializing SSM data folder with hardened ACL, %v", err)
return
}
//Initialize all folders where interim states of executing commands will be stored.
if !initializeBookkeepingLocations(log, shortInstanceId) {
log.Error("unable to initialize. Exiting")
return
}
// Initialize the client diagnostics
cwp.Init()
context = context.With("[instanceID=" + shortInstanceId + "]")
runpluginutil.SSMPluginRegistry = plugin.RegisteredWorkerPlugins(context)
return &CoreManager{
context: context,
coreModules: mr,
cloudwatchPublisher: cwp,
rebooter: rbt,
}, nil
}
// initializeBookkeepingLocations - initializes all folder locations required for bookkeeping
func initializeBookkeepingLocations(log logger.T, shortInstanceID string) bool {
//TODO: initializations for all state tracking folders of core modules should be moved inside the corresponding core modules.
//Create folders pending, current, completed, corrupt under the location DefaultLogDirPath/<instanceId>
log.Info("Initializing bookkeeping folders")
initStatus := true
folders := []string{
appconfig.DefaultLocationOfPending,
appconfig.DefaultLocationOfCurrent,
appconfig.DefaultLocationOfCompleted,
appconfig.DefaultLocationOfCorrupt}
for _, folder := range folders {
directoryName := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.DefaultDocumentRootDirName,
appconfig.DefaultLocationOfState,
folder)
//legacy dir, unused
if folder == appconfig.DefaultLocationOfCompleted {
log.Info("removing the completed state files")
fileutil.DeleteDirectory(directoryName)
}
err := fileutil.MakeDirs(directoryName)
if err != nil {
log.Errorf("Encountered error while creating folders for internal state management. %v", err)
initStatus = false
break
}
}
//Create folders for long running plugins
log.Infof("Initializing bookkeeping folders for long running plugins")
longRunningPluginsFolderName := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.LongRunningPluginsLocation,
appconfig.LongRunningPluginDataStoreLocation)
if err := fileutil.MakeDirs(longRunningPluginsFolderName); err != nil {
log.Error("encountered error while creating folders for internal state management for long running plugins", err)
initStatus = false
}
log.Infof("Initializing replies folder for MDS reply requests that couldn't reach the service")
replies := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.RepliesRootDirName)
if err := fileutil.MakeDirs(replies); err != nil {
log.Error("encountered error while creating folders for MDS replies", err)
initStatus = false
}
log.Infof("Initializing replies folder for MGS reply requests that couldn't reach the service")
mgsReplies := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.RepliesMGSRootDirName)
if err := fileutil.MakeDirs(mgsReplies); err != nil {
log.Error("encountered error while creating folders for MGS replies", err)
initStatus = false
}
log.Infof("Initializing healthcheck folders for long running plugins")
f := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.LongRunningPluginsLocation,
appconfig.LongRunningPluginsHealthCheck)
if err := fileutil.MakeDirs(f); err != nil {
log.Error("encountered error while creating folders for health check for long running plugins", err)
initStatus = false
}
//Create folders for inventory plugin
log.Infof("Initializing locations for inventory plugin")
inventoryLocation := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.InventoryRootDirName)
if err := fileutil.MakeDirs(inventoryLocation); err != nil {
log.Error("encountered error while creating folders for inventory plugin", err)
initStatus = false
}
log.Infof("Initializing default location for custom inventory")
customInventoryLocation := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.InventoryRootDirName,
appconfig.CustomInventoryRootDirName)
if err := fileutil.MakeDirs(customInventoryLocation); err != nil {
log.Error("encountered error while creating folders for custom inventory", err)
initStatus = false
}
log.Infof("Initializing default location for file inventory")
fileInventoryLocation := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.InventoryRootDirName,
appconfig.FileInventoryRootDirName)
if err := fileutil.MakeDirs(fileInventoryLocation); err != nil {
log.Error("encountered error while creating folders for file inventory", err)
initStatus = false
}
log.Infof("Initializing default location for role inventory")
roleInventoryLocation := filepath.Join(appconfig.DefaultDataStorePath,
shortInstanceID,
appconfig.InventoryRootDirName,
appconfig.RoleInventoryRootDirName)
if err := fileutil.MakeDirs(roleInventoryLocation); err != nil {
log.Error("encountered error while creating folders for role inventory", err)
initStatus = false
}
return initStatus
}
// Start executes the registered core modules while watching for reboot request
func (c *CoreManager) Start() {
go c.watchForReboot()
c.executeCoreModules()
}
// Stop requests the core modules to stop executing
// Stop would be called by the agent and should be treated as hard stop
func (c *CoreManager) Stop() {
c.stopCoreModules(hardStopTimeout)
}
// executeCoreModules launches all the core modules
func (c *CoreManager) executeCoreModules() {
l := len(c.coreModules)
for i := 0; i < l; i++ {
go func(i int) {
defer func() {
if r := recover(); r != nil {
c.context.Log().Errorf("Execute core modules panic: %v", r)
c.context.Log().Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
module := c.coreModules[i]
var err error
if err = module.ModuleExecute(); err != nil {
c.context.Log().Errorf("error occurred trying to start core module. Plugin name: %v. Error: %v",
module.ModuleName(),
err)
}
}(i)
}
}
// stopCoreModules requests the core modules to stop
func (c *CoreManager) stopCoreModules(timeout time.Duration) {
log := c.context.Log()
log.Infof("core manager stop requested. timeout: %v", timeout)
var wg sync.WaitGroup
l := len(c.coreModules)
for i := 0; i < l; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
log.Errorf("Core module stop request panic: %v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
module := c.coreModules[i]
if err := module.ModuleStop(timeout); err != nil {
log.Errorf("Plugin (%v) failed to stop with error: %v",
module.ModuleName(),
err)
}
}(i)
}
wg.Wait()
}
// watchForReboot watches for reboot events and request core modules to stop when necessary
func (c *CoreManager) watchForReboot() {
log := c.context.Log()
defer func() {
if r := recover(); r != nil {
log.Errorf("Watch for reboot panic: %v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
ch := c.rebooter.GetChannel()
// blocking receive
val := <-ch
log.Info("A plugin has requested a reboot.")
if val == rebooter.RebootRequestTypeReboot {
log.Info("Processing reboot request...")
c.stopCoreModules(softStopTimeout)
err := c.rebooter.RebootMachine(log)
if err != nil {
log.Criticalf("Error rebooting the machine. Agent restart required Err: %v", err)
}
} else {
log.Error("reboot type not supported yet")
}
}