agent/session/channel/websocketchannel.go (136 lines of code) (raw):
package channel
import (
"errors"
"net/http"
"runtime/debug"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/aliyun/aliyun_assist_client/agent/log"
"github.com/aliyun/aliyun_assist_client/common/httpbase"
)
// IWebSocketChannel is the interface for ControlChannel and DataChannel.
type IWebSocketChannel interface {
Initialize(channelUrl string,
onMessageHandler func([]byte),
onErrorHandler func(error)) error
Open() error
Close() error
StartPings()
IsActive() bool
SendMessage(input []byte, inputType int) error
}
type WebSocketChannel struct {
OnMessage func([]byte)
OnError func(error)
Connection *websocket.Conn
Url string
IsOpen bool
writeLock *sync.Mutex
}
func (webSocketChannel *WebSocketChannel) Initialize(channelUrl string,
onMessageHandler func([]byte),
onErrorHandler func(error)) error {
webSocketChannel.Url = channelUrl
webSocketChannel.OnError = onErrorHandler
webSocketChannel.OnMessage = onMessageHandler
return nil
}
func (webSocketChannel *WebSocketChannel) IsActive() bool {
return webSocketChannel.IsOpen == true
}
func (webSocketChannel *WebSocketChannel) Open() error {
// initialize the write mutex
logger := log.GetLogger().WithField(
"wssurl", webSocketChannel.Url,
)
log.GetLogger().Infoln("WebSocketChannel Open")
webSocketChannel.writeLock = &sync.Mutex{}
header := http.Header{
httpbase.UserAgentHeader: []string{httpbase.UserAgentValue},
}
ws, err := NewWebsocketUtil(logger, nil).OpenConnection(webSocketChannel.Url, header)
if err != nil {
log.GetLogger().Errorln("WebSocketChannel Open failed", err)
return err
}
webSocketChannel.Connection = ws
webSocketChannel.IsOpen = true
webSocketChannel.StartPings()
// spin up a different routine to listen to the incoming traffic
go func() {
defer func() {
if msg := recover(); msg != nil {
log.GetLogger().Errorf("WebsocketChannel listener run panic: %v", msg)
log.GetLogger().Errorf("%s: %s", msg, debug.Stack())
}
}()
retryCount := 0
for {
if webSocketChannel.IsOpen == false {
log.GetLogger().Info("Ending the channel listening routine since the channel is closed")
break
}
messageType, rawMessage, err := webSocketChannel.Connection.ReadMessage()
if err != nil {
retryCount++
if retryCount >= 10 {
log.GetLogger().Warnf("Reach the retry limit for receive messages. Error: %v", err.Error())
webSocketChannel.OnError(err)
break
}
log.GetLogger().Debugf(
"An error happened when receiving the message. Retried times: %d, MessageType: %v, Error: %s",
retryCount,
messageType,
err.Error())
} else if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
// We only accept text messages which are interpreted as UTF-8 or binary encoded text.
log.GetLogger().Errorf("Invalid message type %d. We only accept UTF-8 or binary encoded text", messageType)
} else {
retryCount = 0
webSocketChannel.OnMessage(rawMessage)
}
}
}()
return nil
}
// StartPings starts the pinging process to keep the websocket channel alive.
func (webSocketChannel *WebSocketChannel) StartPings() {
go func() {
for {
if webSocketChannel.IsOpen == false {
return
}
log.GetLogger().Debug("WebsocketChannel: Send ping. Message.")
webSocketChannel.writeLock.Lock()
err := webSocketChannel.Connection.WriteMessage(websocket.PingMessage, []byte("keepalive"))
webSocketChannel.writeLock.Unlock()
if err != nil {
webSocketChannel.Close()
log.GetLogger().Warnf("Error while sending websocket ping: %v", err)
return
}
time.Sleep(60*time.Second)
}
}()
}
// Close closes the corresponding connection.
func (webSocketChannel *WebSocketChannel) Close() error {
logger := log.GetLogger().WithField(
"wssurl", webSocketChannel.Url,
)
log.GetLogger().Info("Closing websocket channel connection to: " + webSocketChannel.Url)
if webSocketChannel.IsOpen == true {
// Send signal to stop receiving message
webSocketChannel.IsOpen = false
return NewWebsocketUtil(logger, nil).CloseConnection(webSocketChannel.Connection)
}
log.GetLogger().Debugf("Websocket channel connection to: " + webSocketChannel.Url + " is already Closed!")
return nil
}
// SendMessage sends a byte message through the websocket connection.
// Examples of message type are websocket.TextMessage or websocket.Binary
func (webSocketChannel *WebSocketChannel) SendMessage(input []byte, inputType int) error {
// log.GetLogger().Infoln("SendMessage: ", string(input))
if webSocketChannel.IsOpen == false {
return errors.New("Can't send message: Connection is closed.")
}
if len(input) < 1 {
return errors.New("Can't send message: Empty input.")
}
webSocketChannel.writeLock.Lock()
err := webSocketChannel.Connection.WriteMessage(inputType, input)
webSocketChannel.writeLock.Unlock()
return err
}