func()

in src/communicator/websocketchannel.go [139:190]


func (webSocketChannel *WebSocketChannel) Open(log log.T) error {
	// initialize the write mutex
	webSocketChannel.writeLock = &sync.Mutex{}

	ws, err := websocketutil.NewWebsocketUtil(log, nil).OpenConnection(webSocketChannel.Url)
	if err != nil {
		return err
	}
	webSocketChannel.Connection = ws
	webSocketChannel.IsOpen = true
	webSocketChannel.StartPings(log, config.PingTimeInterval)

	// spin up a different routine to listen to the incoming traffic
	go func() {
		defer func() {
			if msg := recover(); msg != nil {
				log.Errorf("WebsocketChannel listener run panic: %v", msg)
			}
		}()

		retryCount := 0
		for {
			if webSocketChannel.IsOpen == false {
				log.Debugf("Ending the channel listening routine since the channel is closed: %s",
					webSocketChannel.Url)
				break
			}

			messageType, rawMessage, err := webSocketChannel.Connection.ReadMessage()
			if err != nil {
				retryCount++
				if retryCount >= config.RetryAttempt {
					log.Errorf("Reach the retry limit %v for receive messages.", config.RetryAttempt)
					webSocketChannel.OnError(err)
					break
				}
				log.Debugf("An error happened when receiving the message. Retried times: %v, Error: %v, Messagetype: %v",
					retryCount,
					err.Error(),
					messageType)
			} else if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
				// We only accept text messages which are interpreted as UTF-8 or binary encoded text.
				log.Errorf("Invalid message type. We only accept UTF-8 or binary encoded text. Message type: %v", messageType)

			} else {
				retryCount = 0
				webSocketChannel.OnMessage(rawMessage)
			}
		}
	}()
	return nil
}