agent/framework/processor/executer/iohandler/iohandler.go (326 lines of code) (raw):
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package iohandler implements the iohandler for the plugins
package iohandler
import (
"bytes"
"fmt"
"io"
"runtime/debug"
"github.com/aws/amazon-ssm-agent/agent/agentlogstocloudwatch/cloudwatchlogspublisher"
"github.com/aws/amazon-ssm-agent/agent/context"
"github.com/aws/amazon-ssm-agent/agent/contracts"
"github.com/aws/amazon-ssm-agent/agent/fileutil"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer/iohandler/iomodule"
"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer/iohandler/multiwriter"
)
const (
// maximumPluginOutputSize represents the maximum output size that agent supports
MaximumPluginOutputSize = 2500
// errTitle represents the header that is printed before the error
errTitle = "\n----------ERROR-------\n"
// truncateOut represents the string appended when output is truncated
truncateOut = "\n---Output truncated---"
// truncateError represents the string appended when error is truncated
truncateError = "\n---Error truncated----"
)
// PluginConfig is used for initializing plugins with default values
type PluginConfig struct {
StdoutFileName string
StderrFileName string
StdoutConsoleFileName string
StderrConsoleFileName string
MaxStdoutLength int
MaxStderrLength int
OutputTruncatedSuffix string
}
// DefaultOutputConfig returns the default values for the plugin
func DefaultOutputConfig() PluginConfig {
return PluginConfig{
StdoutFileName: "stdout",
StderrFileName: "stderr",
StdoutConsoleFileName: "stdoutConsole",
StderrConsoleFileName: "stderrConsole",
MaxStdoutLength: 24000,
MaxStderrLength: 8000,
OutputTruncatedSuffix: "--output truncated--",
}
}
// IOHandler Interface defines interface for IOHandler type
type IOHandler interface {
Init(...string)
RegisterOutputSource(multiwriter.DocumentIOMultiWriter, ...iomodule.IOModule)
Close()
String() string
MarkAsFailed(err error)
MarkAsSucceeded()
MarkAsInProgress()
MarkAsSuccessWithReboot()
MarkAsCancelled()
MarkAsShutdown()
AppendInfo(message string)
AppendInfof(format string, params ...interface{})
AppendError(message string)
AppendErrorf(format string, params ...interface{})
// getters/setters
GetStatus() contracts.ResultStatus
GetStdout() string
GetStderr() string
GetExitCode() int
GetStdoutWriter() multiwriter.DocumentIOMultiWriter
GetStderrWriter() multiwriter.DocumentIOMultiWriter
GetIOConfig() contracts.IOConfiguration
SetStatus(contracts.ResultStatus)
SetExitCode(int)
SetOutput(interface{})
SetStdout(string)
SetStderr(string)
}
// DefaultIOHandler is used for writing output by the plugins
type DefaultIOHandler struct {
context context.T
ExitCode int
Status contracts.ResultStatus
//private members - not exposed directly to plugins because they shouldn't write to these
stdout string
stderr string
ioConfig contracts.IOConfiguration
//refreshassociation and invoker write a different output rather than merging stdout and stderr
output interface{}
// List of Writers attached to the IOHandler instance
StdoutWriter multiwriter.DocumentIOMultiWriter
StderrWriter multiwriter.DocumentIOMultiWriter
}
// NewDefaultIOHandler returns a new instance of the IOHandler
func NewDefaultIOHandler(context context.T, ioConfig contracts.IOConfiguration) *DefaultIOHandler {
context.Log().Debugf("IOHandler Initialization with config: %v", ioConfig)
out := new(DefaultIOHandler)
out.context = context
out.ioConfig = ioConfig
return out
}
// Init initializes the plugin output object by creating the necessary writers
func (out *DefaultIOHandler) Init(filePath ...string) {
log := out.context.Log()
pluginConfig := DefaultOutputConfig()
// Create path to output location for file and s3
fullPath := fileutil.BuildSafePath(out.ioConfig.OrchestrationDirectory, filePath...)
s3KeyPrefix := fileutil.BuildS3Path(out.ioConfig.OutputS3KeyPrefix, filePath...)
stdOutLogStreamName := ""
stdErrLogStreamName := ""
if out.ioConfig.CloudWatchConfig.LogGroupName != "" {
cwl := cloudwatchlogspublisher.NewCloudWatchLogsService(out.context)
if err := cwl.CreateLogGroup(out.ioConfig.CloudWatchConfig.LogGroupName); err != nil {
log.Errorf("Error Creating Log Group for CloudWatchLogs output: %v", err)
//Stop CloudWatch Streaming on Error
out.ioConfig.CloudWatchConfig.LogGroupName = ""
}
stdOutLogStreamName = fmt.Sprintf("%s/%s", out.ioConfig.CloudWatchConfig.LogStreamPrefix, pluginConfig.StdoutFileName)
stdErrLogStreamName = fmt.Sprintf("%s/%s", out.ioConfig.CloudWatchConfig.LogStreamPrefix, pluginConfig.StderrFileName)
}
// Initialize file output module
stdoutFile := iomodule.File{
FileName: pluginConfig.StdoutFileName,
OrchestrationDirectory: fullPath,
OutputS3BucketName: out.ioConfig.OutputS3BucketName,
OutputS3KeyPrefix: s3KeyPrefix,
LogGroupName: out.ioConfig.CloudWatchConfig.LogGroupName,
LogStreamName: stdOutLogStreamName,
}
// Initialize console output module
stdoutConsole := iomodule.CommandOutput{
OutputString: &out.stdout,
FileName: pluginConfig.StdoutConsoleFileName,
OrchestrationDirectory: fullPath,
}
log.Debug("Initializing the Stdout Multi-writer with file and console listeners")
// Get a multi-writer for standard output
out.StdoutWriter = multiwriter.NewDocumentIOMultiWriter()
out.RegisterOutputSource(out.StdoutWriter, stdoutFile, stdoutConsole)
// Initialize file error module
stderrFile := iomodule.File{
FileName: pluginConfig.StderrFileName,
OrchestrationDirectory: fullPath,
OutputS3BucketName: out.ioConfig.OutputS3BucketName,
OutputS3KeyPrefix: s3KeyPrefix,
LogGroupName: out.ioConfig.CloudWatchConfig.LogGroupName,
LogStreamName: stdErrLogStreamName,
}
// Initialize console error module
stderrConsole := iomodule.CommandOutput{
OutputString: &out.stderr,
FileName: pluginConfig.StderrConsoleFileName,
OrchestrationDirectory: fullPath,
}
log.Debug("Initializing the Stderr Multi-writer with file and console listeners")
// Get a multi-writer for standard error
out.StderrWriter = multiwriter.NewDocumentIOMultiWriter()
out.RegisterOutputSource(out.StderrWriter, stderrFile, stderrConsole)
}
// RegisterOutputSource returns a new output source by creating a multiwriter for the output modules.
func (out *DefaultIOHandler) RegisterOutputSource(multiWriter multiwriter.DocumentIOMultiWriter, IOModules ...iomodule.IOModule) {
if len(IOModules) == 0 {
return
}
log := out.context.Log()
wg := multiWriter.GetWaitGroup()
// Create a Pipe for each IO Module and add it to the multi-writer.
for _, module := range IOModules {
r, w := io.Pipe()
multiWriter.AddWriter(w)
// Run the reader for each module
log.Debug("Starting a new stream reader go routing")
go func(module iomodule.IOModule, r *io.PipeReader) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Stream reader panic: %v", r)
log.Errorf("Stacktrace:\n%s", debug.Stack())
}
}()
defer wg.Done()
module.Read(out.context, r, out.ExitCode)
}(module, r)
}
return
}
// Close closes all the attached writers.
func (out *DefaultIOHandler) Close() {
log := out.context.Log()
log.Debug("IOHandler closing all subscribed writers.")
if out.StdoutWriter != nil {
out.StdoutWriter.Close()
}
if out.StderrWriter != nil {
out.StderrWriter.Close()
}
}
// String returns the output by concatenating stdout and stderr
func (out DefaultIOHandler) String() (response string) {
// exit code 168 is a successful execution signaling all future steps should be ignored
// therefore, we remove the error message of exit status 168 which would be confusing in the output
if out.ExitCode == contracts.ExitWithSuccess {
out.stderr = ""
}
return TruncateOutput(out.stdout, out.stderr, MaximumPluginOutputSize)
}
// GetOutput returns the output to be appended to the response
func (out DefaultIOHandler) GetOutput() interface{} {
// Return output if assigned. Otherwise, return stdout + stderr
if out.output == nil {
return out.String()
}
return out.output
}
// GetStatus returns the status
func (out DefaultIOHandler) GetStatus() contracts.ResultStatus {
return out.Status
}
// GetStdout returns the stdout
func (out DefaultIOHandler) GetStdout() string {
return out.stdout
}
// GetExitCode returns the exit code
func (out DefaultIOHandler) GetExitCode() int {
return out.ExitCode
}
// GetStderr returns the stderr
func (out DefaultIOHandler) GetStderr() string {
return out.stderr
}
// GetIOConfig returns the io configuration
func (out DefaultIOHandler) GetIOConfig() contracts.IOConfiguration {
return out.ioConfig
}
// GetStdoutWriter returns the stdout writer
func (out DefaultIOHandler) GetStdoutWriter() multiwriter.DocumentIOMultiWriter {
return out.StdoutWriter
}
// GetStderrWriter returns the stderr writer
func (out DefaultIOHandler) GetStderrWriter() multiwriter.DocumentIOMultiWriter {
return out.StderrWriter
}
// SetStatus sets the status
func (out *DefaultIOHandler) SetStatus(status contracts.ResultStatus) {
out.Status = status
}
// SetStdout sets the stdout
func (out *DefaultIOHandler) SetStdout(stdout string) {
out.stdout = stdout
}
// SetStderr sets the stderr
func (out *DefaultIOHandler) SetStderr(stderr string) {
out.stderr = stderr
}
// SetExitCode sets the exit code
func (out *DefaultIOHandler) SetExitCode(exitCode int) {
out.ExitCode = exitCode
}
// SetOutput sets the output
func (out *DefaultIOHandler) SetOutput(output interface{}) {
out.output = output
}
// Merge plugin output objects
func (out *DefaultIOHandler) Merge(mergeOutput *DefaultIOHandler) {
// Append Info
var stdoutBuffer bytes.Buffer
if len(out.stdout) > 0 {
stdoutBuffer.WriteString(out.stdout + "\n")
}
stdoutBuffer.WriteString(mergeOutput.GetStdout())
out.stdout = stdoutBuffer.String()
// Append Error
var stderrBuffer bytes.Buffer
if len(out.stderr) > 0 {
stderrBuffer.WriteString(out.stderr + "\n")
}
stderrBuffer.WriteString(mergeOutput.GetStderr())
out.stderr = stderrBuffer.String()
if out.ExitCode == 0 {
out.ExitCode = mergeOutput.GetExitCode()
}
out.Status = contracts.MergeResultStatus(out.Status, mergeOutput.GetStatus())
}
// MarkAsFailed Failed marks plugin as Failed
func (out *DefaultIOHandler) MarkAsFailed(err error) {
// Update the error exit code
if out.ExitCode == 0 {
out.ExitCode = 1
}
out.Status = contracts.ResultStatusFailed
if err != nil {
out.AppendError(err.Error())
}
}
// MarkAsSucceeded marks plugin as Successful.
func (out *DefaultIOHandler) MarkAsSucceeded() {
out.ExitCode = 0
out.Status = contracts.ResultStatusSuccess
}
// MarkAsInProgress marks plugin as In Progress.
func (out *DefaultIOHandler) MarkAsInProgress() {
out.ExitCode = 0
out.Status = contracts.ResultStatusInProgress
}
// MarkAsSuccessWithReboot marks plugin as Successful and requests a reboot.
func (out *DefaultIOHandler) MarkAsSuccessWithReboot() {
out.ExitCode = 0
out.Status = contracts.ResultStatusSuccessAndReboot
}
// MarkAsCancelled marks a plugin as Cancelled.
func (out *DefaultIOHandler) MarkAsCancelled() {
out.ExitCode = 1
out.Status = contracts.ResultStatusCancelled
}
// MarkAsShutdown marks a plugin as Failed in the case of interruption due to shutdown signal.
func (out *DefaultIOHandler) MarkAsShutdown() {
out.ExitCode = 1
out.Status = contracts.ResultStatusCancelled
}
// AppendInfo adds info to IOHandler StandardOut.
func (out *DefaultIOHandler) AppendInfo(message string) {
if len(message) > 0 && out.StdoutWriter != nil {
if len(out.stdout) > 0 {
out.StdoutWriter.WriteString("\n")
}
out.StdoutWriter.WriteString(message)
} else {
// Write to stdout if the writer is not defined.
if len(out.stdout) > 0 {
out.stdout = fmt.Sprintf("%v\n%v", out.stdout, message)
} else {
out.stdout = message
}
}
}
// AppendInfof adds info to DefaultIOHandler StandardOut with formatting parameters.
func (out *DefaultIOHandler) AppendInfof(format string, params ...interface{}) {
if len(format) > 0 {
message := fmt.Sprintf(format, params...)
out.AppendInfo(message)
}
}
// AppendError adds errors to DefaultIOHandler StandardErr.
func (out *DefaultIOHandler) AppendError(message string) {
if len(message) > 0 && out.StderrWriter != nil {
if len(out.stderr) > 0 {
out.StderrWriter.WriteString("\n")
}
out.StderrWriter.WriteString(message)
} else {
// Write to stderr if the writer is not defined.
if len(out.stderr) > 0 {
out.stderr = fmt.Sprintf("%v\n%v", out.stderr, message)
} else {
out.stderr = message
}
}
}
// AppendErrorf adds errors to DefaultIOHandler StandardErr with formatting parameters.
func (out *DefaultIOHandler) AppendErrorf(format string, params ...interface{}) {
if len(format) > 0 {
message := fmt.Sprintf(format, params...)
out.AppendError(message)
}
}
// TruncateOutput truncates the output
func TruncateOutput(stdout string, stderr string, capacity int) (response string) {
outputSize := len(stdout)
errorSize := len(stderr)
// prepare error title
errorTitle := ""
lenErrorTitle := 0
if errorSize > 0 {
errorTitle = errTitle
lenErrorTitle = len(errorTitle)
}
// calculate available space
availableSpace := capacity - lenErrorTitle
// all fits within availableSpace
if (outputSize + errorSize) < availableSpace {
return fmt.Sprint(stdout, errorTitle, stderr)
}
// truncate out and error when both exceed the size
if outputSize > availableSpace/2 && errorSize > availableSpace/2 {
truncateSize := availableSpace - len(truncateError) - len(truncateOut)
return fmt.Sprint(stdout[:truncateSize/2], truncateOut, errorTitle, stderr[:truncateSize/2], truncateError)
}
// truncate error when output is short
if outputSize <= availableSpace/2 {
truncateSize := availableSpace - len(truncateError)
return fmt.Sprint(stdout, errorTitle, stderr[:truncateSize-outputSize], truncateError)
}
// truncate output when error is short
truncateSize := availableSpace - len(truncateOut)
return fmt.Sprint(stdout[:truncateSize-errorSize], truncateOut, errorTitle, stderr)
}