agent/framework/processor/executer/iohandler/multiwriter/multiwriter.go (62 lines of code) (raw):
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package multiwriter implements a multi-writer
package multiwriter
import (
"fmt"
"io"
"sync"
)
// DocumentIOMultiWriter is a multi-writer with support for close channel.
// This is responsible for creating a fan-out multi-writer which allows you to duplicate
// the writes to all the provided writers.
type DocumentIOMultiWriter interface {
AddWriter(*io.PipeWriter)
GetWaitGroup() *sync.WaitGroup
Write([]byte) (int, error)
WriteString(string) (int, error)
Close() error
}
// DefaultDocumentIOMultiWriter is the default implementation of multi-writer.
type DefaultDocumentIOMultiWriter struct {
writers []*io.PipeWriter
wg *sync.WaitGroup
}
// NewDocumentIOMultiWriter creates a new document multi-writer
func NewDocumentIOMultiWriter() (b *DefaultDocumentIOMultiWriter) {
var w []*io.PipeWriter
b = &DefaultDocumentIOMultiWriter{w, new(sync.WaitGroup)}
return
}
// AddWriter adds a new writer to an existing multi-writer
func (b *DefaultDocumentIOMultiWriter) AddWriter(writer *io.PipeWriter) {
b.writers = append(b.writers, writer)
b.wg.Add(1)
}
// GetStreamClosedChannel adds a new writer to an existing multi-writer
func (b *DefaultDocumentIOMultiWriter) GetWaitGroup() *sync.WaitGroup {
return b.wg
}
// Write is responsible for writing a byte to all the attached pipes.
func (b *DefaultDocumentIOMultiWriter) Write(p []byte) (n int, err error) {
if len(b.writers) == 0 {
return 0, fmt.Errorf("No writers present.")
}
for i := 0; i < len(b.writers); i++ {
n, err = b.writers[i].Write(p)
// TODO: Handler other error types and close the writers after a fixed number of retries
if err == io.ErrClosedPipe {
// remove the writer as the reader is closed
b.writers = append(b.writers[:i], b.writers[i+1:]...)
i--
}
if n != len(p) {
err = io.ErrShortWrite
}
}
return len(p), nil
}
// WriteString is responsible for writing a string to all the attached pipes.
func (b *DefaultDocumentIOMultiWriter) WriteString(message string) (n int, err error) {
if len(b.writers) == 0 {
return 0, fmt.Errorf("No writers present.")
}
for _, w := range b.writers {
p := []byte(message)
n, err = w.Write(p)
}
return len(message), nil
}
// Close waits for all the writers to be closed.
func (b *DefaultDocumentIOMultiWriter) Close() (err error) {
for i := 0; i < len(b.writers); i++ {
err = b.writers[i].Close()
}
b.wg.Wait()
return
}