lib/ec2macossystemmonitor/relayd.go (150 lines of code) (raw):
package ec2macossystemmonitor
import (
"bytes"
"compress/zlib"
"encoding/base64"
"encoding/json"
"fmt"
"hash/adler32"
"net"
"os"
"sync/atomic"
"time"
)
const SocketTimeout = 5 * time.Second
// DefaultRelaydSocketPath is the default socket for relayd listener.
const DefaultRelaydSocketPath = "/tmp/.ec2monitoring.sock"
// CheckSocketExists is a helper function to quickly check if the service UDS
// exists.
func CheckSocketExists(socketPath string) (exists bool) {
return fileExists(socketPath)
}
// BuildMessage takes a tag along with data for the tag and builds a byte slice to be sent to the relay.
//
// The tag is used as a way to namespace various payloads that are supported. Data is the payload and its format is
// specific to each tag. Each payload has the option to be compressed and this flag is part of the envelope created for
// sending data. The slice of bytes is passed back to the caller to allow flexibility to log the bytes if desired before
// passing to the relay via PassToRelayd
func BuildMessage(tag string, data string, compress bool) ([]byte, error) {
payload := SerialPayload{
Tag: tag,
Compress: compress,
Data: data,
}
// This determines if the data will be passed in as provided or zlib compressed and then base64 encoded
// Some payload will exceed the limit of what can be sent on the serial device, so compression allows more data
// to be sent. base64 encoding allows safe characters only to be passed on the device
if compress {
var b bytes.Buffer
w, err := zlib.NewWriterLevel(&b, 9)
if err != nil {
return nil, fmt.Errorf("ec2macossystemmonitor: couldn't get compression writer: %w", err)
}
_, err = w.Write([]byte(data))
if err != nil {
return nil, fmt.Errorf("ec2macossystemmonitor: couldn't copy compressed data: %w", err)
}
err = w.Close()
if err != nil {
return nil, fmt.Errorf("ec2macossystemmonitor: couldn't close compressor: %w", err)
}
payload.Data = base64.StdEncoding.EncodeToString(b.Bytes())
}
// Marshal the payload to wrap in the relay output message.
payloadBytes, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("ec2macossystemmonitor: %w", err)
}
messageBytes, err := json.Marshal(SerialMessage{
Checksum: adler32.Checksum(payloadBytes),
Payload: string(payloadBytes),
})
if err != nil {
return nil, fmt.Errorf("ec2macossystemmonitor: marshal: %w", err)
}
// FIXME: message shouldn't append the newline, that's up to clients to
// decide (ie: flushing data as needed to clients/servers).
messageBytes = append(messageBytes, "\n"...)
return messageBytes, nil
}
// PassToRelayd takes a byte slice and writes it to a UNIX socket to send for relaying.
func PassToRelayd(messageBytes []byte) (n int, err error) {
// Make sure we have socket to connect to.
if !fileExists(DefaultRelaydSocketPath) {
return 0, fmt.Errorf("ec2macossystemmonitor: %s does not exist, cannot send message: %s", DefaultRelaydSocketPath, string(messageBytes))
}
// Connect and relay!
sock, err := net.Dial("unix", DefaultRelaydSocketPath)
if err != nil {
return 0, fmt.Errorf("cec2macossystemmonitor: could not connect to %s: %s", DefaultRelaydSocketPath, err)
}
defer sock.Close()
n, err = sock.Write(messageBytes)
if err != nil {
return n, fmt.Errorf("ec2macossystemmonitor: error while writing to socket: %s", err)
}
return n, nil
}
// SendMessage takes a tag along with data for the tag and writes to a UNIX socket to send for relaying. This is provided
// for convenience to allow quick sending of data to the relay. It calls BuildMessage and then PassToRelayd in order.
func SendMessage(tag string, data string, compress bool) (n int, err error) {
msgBytes, err := BuildMessage(tag, data, compress)
if err != nil {
return 0, fmt.Errorf("ec2macossystemmonitor: error while building message bytes: %w", err)
}
return PassToRelayd(msgBytes)
}
// SerialRelay manages client & listener to relay recieved messages to a serial
// connection.
type SerialRelay struct {
// serialConnection is the managed serial device connection for writing
// (ie: relayed output).
serialConnection *SerialConnection
// listener handles connections to relay received messages to the configured
// serialConnection.
listener net.Listener
// ReadyToClose is the channel for communicating the need to close
// connections.
//
// TODO: use context as replacement for cancellation
ReadyToClose chan bool
}
// NewRelay creates an instance of the relay server and returns a SerialRelay for manual closing.
//
// The SerialRelay returned from NewRelay is designed to be used in a go routine by using StartRelay. This allows the
// caller to handle OS Signals and other events for clean shutdown rather than relying upon defer calls.
func NewRelay(serialDevice string) (relay SerialRelay, err error) {
const socketPath = DefaultRelaydSocketPath
// Create a serial connection
serCon, err := NewSerialConnection(serialDevice)
if err != nil {
return SerialRelay{}, fmt.Errorf("relayd: failed to build a connection to serial interface: %w", err)
}
// Remove
if err = os.RemoveAll(socketPath); err != nil {
if _, ok := err.(*os.PathError); ok {
// Help guide that the SocketPath is invalid
return SerialRelay{}, fmt.Errorf("relayd: unable to clean %s: %w", socketPath, err)
} else {
// Unknown issue, return the error directly
return SerialRelay{}, err
}
}
// Create the UDS listener.
addr, err := net.ResolveUnixAddr("unix", DefaultRelaydSocketPath)
if err != nil {
return SerialRelay{}, fmt.Errorf("relayd: unable to resolve address: %w", err)
}
listener, err := net.ListenUnix("unix", addr)
if err != nil {
return SerialRelay{}, fmt.Errorf("relayd: unable to listen on socket: %w", err)
}
return SerialRelay{
listener: listener,
serialConnection: serCon,
ReadyToClose: make(chan bool),
}, nil
}
// setListenerDeadline will set a deadline on the underlying net.Listener if
// supported, no-op otherwise.
func (relay *SerialRelay) setListenerDeadline(t time.Time) error {
deadliner, ok := relay.listener.(interface{
SetDeadline(time.Time) error
})
if ok {
return deadliner.SetDeadline(t)
}
return nil
}
// StartRelay starts the listener ahdn handles connections for the serial relay.
//
// This is a server implementation of the SerialRelay so it logs to a provided
// logger, and empty logger can be provided to stop logging if desired. This
// function is designed to be used in a go routine so logging may be the only
// way to get data about behavior while it is running. The resources can be shut
// down by sending true to the ReadyToClose channel. This invokes CleanUp()
// which is exported in case the caller desires to call it instead.
func (relay *SerialRelay) StartRelay(logger *Logger, relayStatus *StatusLogBuffer) {
// Accept new connections, dispatching them to relayServer in a goroutine.
for {
err := relay.setListenerDeadline(time.Now().Add(SocketTimeout))
if err != nil {
logger.Fatal("Unable to set deadline on socket:", err)
}
socCon, err := relay.listener.Accept()
// Look for signal to exit, otherwise keep going, check the error only if we aren't supposed to shutdown
select {
case <-relay.ReadyToClose:
logger.Info("[relayd] requested to shutdown")
// Clean up resources manually
relay.CleanUp()
// Return to stop the connections from continuing
return
default:
// If ReadyToClose has not been sent, then check for errors, handle timeouts, otherwise process
if err != nil {
if er, ok := err.(net.Error); ok && er.Timeout() {
// This is just a timeout, break the loop and go to the top to start listening again
continue
} else {
// This is some other error, for Accept(), its a fatal error if we can't Accept()
logger.Fatal("Unable to start accepting on socket:", err)
}
}
}
// Write the date to the relay
written, err := relay.serialConnection.RelayData(socCon)
if err != nil {
logger.Errorf("Failed to send data: %s\n", err)
}
// Increment the counter
atomic.AddInt64(&relayStatus.Written, int64(written))
}
}
// CleanUp manually closes the connections for a Serial Relay. This is called from StartRelay when true is sent on
// ReadyToClose so it should only be called separately if closing outside of that mechanism.
func (relay *SerialRelay) CleanUp() {
_ = relay.listener.Close()
_ = relay.serialConnection.Close()
_ = os.RemoveAll(DefaultRelaydSocketPath)
}