in src/communicator/websocketchannel.go [139:190]
func (webSocketChannel *WebSocketChannel) Open(log log.T) error {
// initialize the write mutex
webSocketChannel.writeLock = &sync.Mutex{}
ws, err := websocketutil.NewWebsocketUtil(log, nil).OpenConnection(webSocketChannel.Url)
if err != nil {
return err
}
webSocketChannel.Connection = ws
webSocketChannel.IsOpen = true
webSocketChannel.StartPings(log, config.PingTimeInterval)
// spin up a different routine to listen to the incoming traffic
go func() {
defer func() {
if msg := recover(); msg != nil {
log.Errorf("WebsocketChannel listener run panic: %v", msg)
}
}()
retryCount := 0
for {
if webSocketChannel.IsOpen == false {
log.Debugf("Ending the channel listening routine since the channel is closed: %s",
webSocketChannel.Url)
break
}
messageType, rawMessage, err := webSocketChannel.Connection.ReadMessage()
if err != nil {
retryCount++
if retryCount >= config.RetryAttempt {
log.Errorf("Reach the retry limit %v for receive messages.", config.RetryAttempt)
webSocketChannel.OnError(err)
break
}
log.Debugf("An error happened when receiving the message. Retried times: %v, Error: %v, Messagetype: %v",
retryCount,
err.Error(),
messageType)
} else if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
// We only accept text messages which are interpreted as UTF-8 or binary encoded text.
log.Errorf("Invalid message type. We only accept UTF-8 or binary encoded text. Message type: %v", messageType)
} else {
retryCount = 0
webSocketChannel.OnMessage(rawMessage)
}
}
}()
return nil
}