agent/framework/docmanager/docmanager.go (358 lines of code) (raw):
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package docmanager helps persist documents state to disk
package docmanager
import (
"os"
"path"
"path/filepath"
"regexp"
"runtime/debug"
"sync"
"time"
"github.com/aws/amazon-ssm-agent/agent/appconfig"
"github.com/aws/amazon-ssm-agent/agent/context"
"github.com/aws/amazon-ssm-agent/agent/contracts"
"github.com/aws/amazon-ssm-agent/agent/fileutil"
"github.com/aws/amazon-ssm-agent/agent/jsonutil"
"github.com/aws/amazon-ssm-agent/agent/log"
"github.com/aws/amazon-ssm-agent/agent/session/utility"
)
const (
maxOrchestrationDirectoryDeletions int = 1000
)
type validString func(string) bool
type modifyString func(string) string
type DocumentMgr interface {
MoveDocumentState(fileName, srcLocationFolder, dstLocationFolder string)
PersistDocumentState(fileName, locationFolder string, state contracts.DocumentState)
GetDocumentState(fileName, locationFolder string) contracts.DocumentState
RemoveDocumentState(fileName, locationFolder string)
}
// TODO use class lock instead of global lock?
// TODO decouple the DocState model to better fit the service-processor-executer architecture
// DocumentFileMgr encapsulate the file access and perform bookkeeping operations at the specified file location
type DocumentFileMgr struct {
context context.T
dataStorePath string
rootDirName string
stateLocation string
}
func NewDocumentFileMgr(context context.T, dataStorePath, rootDirName, stateLocation string) *DocumentFileMgr {
return &DocumentFileMgr{
context: context,
dataStorePath: dataStorePath,
rootDirName: rootDirName,
stateLocation: stateLocation,
}
}
func (d *DocumentFileMgr) MoveDocumentState(fileName, srcLocationFolder, dstLocationFolder string) {
log := d.context.Log()
instanceID, err := d.context.Identity().ShortInstanceID()
if err != nil {
log.Errorf("Failed to get short instanceID for MoveDocumentState: %v", err)
}
absoluteSource := path.Join(d.dataStorePath,
instanceID,
d.rootDirName,
d.stateLocation,
srcLocationFolder)
absoluteDestination := path.Join(d.dataStorePath,
instanceID,
d.rootDirName,
d.stateLocation,
dstLocationFolder)
if s, err := fileutil.MoveFile(fileName, absoluteSource, absoluteDestination); s && err == nil {
log.Debugf("moved file %v from %v to %v successfully", fileName, srcLocationFolder, dstLocationFolder)
} else {
log.Debugf("moving file %v from %v to %v failed with error %v", fileName, srcLocationFolder, dstLocationFolder, err)
}
}
func (d *DocumentFileMgr) PersistDocumentState(fileName, locationFolder string, state contracts.DocumentState) {
log := d.context.Log()
instanceID, err := d.context.Identity().ShortInstanceID()
if err != nil {
log.Errorf("Failed to get short instanceID for PersistDocumentState: %v", err)
}
absoluteFileName := path.Join(path.Join(d.dataStorePath,
instanceID,
d.rootDirName,
d.stateLocation,
locationFolder), fileName)
content, err := jsonutil.Marshal(state)
if err != nil {
log.Errorf("encountered error with message %v while marshalling %v to string", err, state)
} else {
if fileutil.Exists(absoluteFileName) {
log.Debugf("overwriting contents of %v", absoluteFileName)
}
log.Tracef("persisting interim state %v in file %v", jsonutil.Indent(content), absoluteFileName)
if s, err := fileutil.WriteIntoFileWithPermissions(absoluteFileName, jsonutil.Indent(content), os.FileMode(int(appconfig.ReadWriteAccess))); s && err == nil {
log.Debugf("successfully persisted interim state in %v", locationFolder)
} else {
log.Debugf("persisting interim state in %v failed with error %v", locationFolder, err)
}
}
}
func (d *DocumentFileMgr) GetDocumentState(fileName, locationFolder string) contracts.DocumentState {
log := d.context.Log()
instanceID, err := d.context.Identity().ShortInstanceID()
if err != nil {
log.Errorf("Failed to get short instanceID for GetDocumentState: %v", err)
}
filepath := path.Join(d.dataStorePath,
instanceID,
d.rootDirName,
d.stateLocation,
locationFolder)
absoluteFileName := path.Join(filepath, fileName)
var commandState contracts.DocumentState
var count, retryLimit int = 0, 3
if fileExists, _ := fileutil.LocalFileExist(absoluteFileName); !fileExists {
log.Warnf("file not found in the docState directory %v", absoluteFileName)
return commandState
}
// retry to avoid sync problem, which arises when OfflineService and MessageDeliveryService try to access the file at the same time
for count < retryLimit {
err := jsonutil.UnmarshalFile(absoluteFileName, &commandState)
if err != nil {
log.Errorf("encountered error with message %v while reading Interim state of command from file - %v", err, fileName)
count += 1
time.Sleep(500 * time.Millisecond)
continue
} else {
//logging interim state as read from the file
jsonString, err := jsonutil.Marshal(commandState)
if err != nil {
log.Errorf("encountered error with message %v while marshalling %v to string", err, commandState)
} else {
log.Tracef("interim CommandState read from file-system - %v", jsonutil.Indent(jsonString))
}
break
}
}
if count >= retryLimit {
if fileExists, _ := fileutil.LocalFileExist(absoluteFileName); fileExists {
if documentContents, err := fileutil.ReadAllText(absoluteFileName); err == nil {
log.Infof("Document contents: %v", documentContents)
}
d.MoveDocumentState(fileName, locationFolder, appconfig.DefaultLocationOfCorrupt)
}
}
return commandState
}
// RemoveData deletes the fileName from locationFolder under defaultLogDir/instanceID
func (d *DocumentFileMgr) RemoveDocumentState(commandID, locationFolder string) {
log := d.context.Log()
instanceID, err := d.context.Identity().ShortInstanceID()
if err != nil {
log.Errorf("Failed to get short instanceID for GetDocumentState: %v", err)
}
absoluteFileName := docStateFileName(commandID, instanceID, locationFolder)
err = fileutil.DeleteFile(absoluteFileName)
if err != nil {
log.Errorf("encountered error %v while deleting file %v", err, absoluteFileName)
} else {
log.Debugf("successfully deleted file %v", absoluteFileName)
}
}
// TODO rework this part
// DocumentStateDir returns absolute filename where command states are persisted
func DocumentStateDir(instanceID, locationFolder string) string {
return filepath.Join(appconfig.DefaultDataStorePath,
instanceID,
appconfig.DefaultDocumentRootDirName,
appconfig.DefaultLocationOfState,
locationFolder)
}
// orchestrationDir returns the absolute path of the orchestration directory
func orchestrationDir(instanceID, orchestrationRootDirName string, folderType string) string {
switch folderType {
case appconfig.DefaultSessionRootDirName:
return path.Join(appconfig.DefaultDataStorePath,
instanceID,
appconfig.DefaultSessionRootDirName,
orchestrationRootDirName)
default:
return path.Join(appconfig.DefaultDataStorePath,
instanceID,
appconfig.DefaultDocumentRootDirName,
orchestrationRootDirName)
}
}
// getOrchestrationDirectoryNames returns list of orchestration directories.
func getOrchestrationDirectoryNames(log log.T, instanceID, orchestrationRootDirName string, folderType string) (orchestrationRootDir string, dirNames []string, err error) {
// Form the path for orchestration logs dir
orchestrationRootDir = orchestrationDir(instanceID, orchestrationRootDirName, folderType)
if !fileutil.Exists(orchestrationRootDir) {
log.Debugf("Orchestration root directory doesn't exist: %v", orchestrationRootDir)
return orchestrationRootDir, []string{}, nil
}
dirNames, err = fileutil.GetDirectoryNames(orchestrationRootDir)
return orchestrationRootDir, dirNames, err
}
// isRunCommandDirName checks whether the file name format satisfies the format for RunCommand generated log files
func isRunCommandDirName(dirName string) (matched bool) {
matched, _ = regexp.MatchString("^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$", dirName)
return
}
// isAssociationLogFile checks whether the file name passed is of the format of Association Files
func isAssociationRunDirName(dirName string) (matched bool) {
matched, _ = regexp.MatchString("^[0-9]{4}-[0-9]{2}-[0-9]{2}.*$", dirName)
return
}
// cleanupAssociationDirectory cleans up association directory by deleting expired association run directories from it.
func cleanupAssociationDirectory(log log.T, deletedCount int, commandOrchestrationPath string, retentionDurationHours int) (canDeleteDirectory bool, deletedCountAfter int) {
subdirNames, err := fileutil.GetDirectoryNames(commandOrchestrationPath)
if err != nil {
log.Infof("Error reading association orchestration directory %v: %v", commandOrchestrationPath, err)
return false, deletedCount
}
canDeleteDirectory = true
log.Debugf("Starting deletion of association directories")
for _, subdirName := range subdirNames {
if deletedCount >= maxOrchestrationDirectoryDeletions {
log.Infof("Reached max number of deletions for orchestration directories: %v", deletedCount)
canDeleteDirectory = false
break
}
if !isAssociationRunDirName(subdirName) {
continue
}
subdirpath := filepath.Join(commandOrchestrationPath, subdirName)
log.Debugf("Checking association-run orchestration directory: %v", subdirpath)
if expiredDir := isOlderThan(log, subdirpath, retentionDurationHours); !expiredDir {
canDeleteDirectory = false
continue
}
log.Debugf("Attempting deletion of association-run orchestration directory %v", subdirpath)
if err := fileutil.DeleteDirectory(subdirpath); err != nil {
log.Debugf("Error deleting directory %v: %v", subdirpath, err)
canDeleteDirectory = false
continue
}
deletedCount += 1
}
log.Debugf("Finished deleting %v association directories", deletedCount)
return canDeleteDirectory, deletedCount
}
// isLegacyAssociationDirectory checks whether orchestration directory is a legacy association directory.
func isLegacyAssociationDirectory(log log.T, commandOrchestrationPath string) (bool, error) {
subdirNames, err := fileutil.GetDirectoryNames(commandOrchestrationPath)
if err != nil {
log.Debugf("Error reading orchestration directory %v: %v", commandOrchestrationPath, err)
return false, err
}
// If run sub-directory exists, then it is legacy association orchestration directory
for _, subdirName := range subdirNames {
if isAssociationRunDirName(subdirName) {
return true, nil
}
}
return false, nil
}
// Global variables to throttle the impact of constantly rechecking the stale orchestration files
var cleanupLock sync.Mutex
var inCleanup = make(map[string]bool)
var nextCleanup = make(map[string]time.Time) // okay that these will default to start of epoch
func getLock(name string) bool {
cleanupLock.Lock()
defer cleanupLock.Unlock()
if inCleanup[name] {
return false
}
if !nextCleanup[name].IsZero() && time.Now().Before(nextCleanup[name]) {
return false
}
inCleanup[name] = true
return true
}
func releaseLock(name string) {
cleanupLock.Lock()
defer cleanupLock.Unlock()
inCleanup[name] = false
}
func updateTime(name string) {
cleanupLock.Lock()
defer cleanupLock.Unlock()
nextCleanup[name] = time.Now().Add(time.Minute * 15)
}
// DeleteOldOrchestrationDirectories deletes expired orchestration directories based on retentionDurationHours and associationRetentionDurationHours.
func DeleteOldOrchestrationDirectories(log log.T, instanceID, orchestrationRootDirName string, retentionDurationHours int, associationRetentionDurationHours int) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Delete orchestration directories panic: %v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
// if somebody else is cleaning up this directory, or if it's too soon to try again, bail.
if !getLock(orchestrationRootDirName) {
return
}
// make certain that we release our hold if something goes sideways
defer releaseLock(orchestrationRootDirName)
orchestrationRootDir, dirNames, err := getOrchestrationDirectoryNames(log, instanceID, orchestrationRootDirName, appconfig.DefaultDocumentRootDirName)
if err != nil {
log.Errorf("Failed to get orchestration directories under %v", err)
return
}
log.Debugf("Cleaning up orchestration directories: %v", orchestrationRootDir)
deletedCount := 0
for _, dirName := range dirNames {
if deletedCount >= maxOrchestrationDirectoryDeletions {
log.Warnf("Reached max number of deletions for orchestration directories: %v", deletedCount)
break
}
commandOrchestrationPath := filepath.Join(orchestrationRootDir, dirName)
if isAssoc, err := isLegacyAssociationDirectory(log, commandOrchestrationPath); isAssoc && err == nil {
var canDeleteDirectory bool
canDeleteDirectory, deletedCount = cleanupAssociationDirectory(log, deletedCount, commandOrchestrationPath, associationRetentionDurationHours)
if !canDeleteDirectory {
continue
}
}
log.Debugf("Checking command orchestration directory: %v", commandOrchestrationPath)
if isOlderThan(log, commandOrchestrationPath, retentionDurationHours) {
log.Debugf("Attempting deletion of command orchestration directory: %v", commandOrchestrationPath)
err := fileutil.DeleteDirectory(commandOrchestrationPath)
if err != nil {
log.Debugf("Error deleting directory %v: %v", commandOrchestrationPath, err)
continue
}
// Deletion of both document state and orchestration file was successful
deletedCount += 1
}
}
updateTime(orchestrationRootDirName)
log.Debugf("Completed orchestration directory clean up of %v items", deletedCount)
}
// DeleteSessionOrchestrationDirectories deletes expired orchestration directories based on session retentionDurationHours.
func DeleteSessionOrchestrationDirectories(log log.T, instanceID, orchestrationRootDirName string, retentionDurationHours int) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Delete session orchestration directories panic: %v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
// Add lock to prevent quadratic fs lookup for large volume session users.
sessionLockName := orchestrationRootDirName + appconfig.DefaultSessionRootDirName
if !getLock(sessionLockName) {
return
}
defer releaseLock(sessionLockName)
orchestrationRootDir, dirNames, err := getOrchestrationDirectoryNames(log, instanceID, orchestrationRootDirName, appconfig.DefaultSessionRootDirName)
if err != nil {
log.Debugf("Failed to get orchestration directories under %v", err)
return
}
log.Debugf("Cleaning up orchestration directories: %v", orchestrationRootDir)
deletedCount := 0
for _, dirName := range dirNames {
if deletedCount >= maxOrchestrationDirectoryDeletions {
log.Infof("Reached max number of deletions for orchestration directories: %v", deletedCount)
break
}
sessionOrchestrationPath := filepath.Join(orchestrationRootDir, dirName)
if isOlderThan(log, sessionOrchestrationPath, retentionDurationHours) {
log.Debugf("Attempting deletion of session orchestration directory: %v", sessionOrchestrationPath)
err := fileutil.DeleteDirectory(sessionOrchestrationPath)
if err != nil {
log.Debugf("Error deleting directory %v: %v", sessionOrchestrationPath, err)
// With CloudWatch streaming of logs, a change was introduced to make ipcTempFile append only on linux.
// This append only mode results into error while deletion of the file.
// Below logic is to attempt to delete ipcTempFile in case of such errors.
u := &utility.SessionUtil{}
success, err := u.DeleteIpcTempFile(log, sessionOrchestrationPath)
if err != nil || !success {
log.Debugf("Retry attempt to delete session orchestration directory %s failed, %v", sessionOrchestrationPath, err)
continue
}
}
// Deletion of both document state and orchestration file was successful
deletedCount += 1
}
}
updateTime(sessionLockName)
log.Debugf("Completed session orchestration directory clean up of %v items", deletedCount)
}
// isOlderThan checks whether the file is older than the retention duration
func isOlderThan(log log.T, fileFullPath string, retentionDurationHours int) bool {
modificationTime, err := fileutil.GetFileModificationTime(fileFullPath)
if err != nil {
log.Debugf("Failed to get modification time %v", err)
return false
}
// Check whether the current time is after modification time plus the retention duration
return modificationTime.Add(time.Hour * time.Duration(retentionDurationHours)).Before(time.Now())
}
// docStateFileName returns absolute filename where command states are persisted
func docStateFileName(fileName, instanceID, locationFolder string) string {
return path.Join(DocumentStateDir(instanceID, locationFolder), fileName)
}