in agent/channel/channel_websocket.go [57:206]
func (c *WebSocketChannel) StartChannel() error {
c.lock.Lock()
defer c.lock.Unlock()
errmsg := ""
defer func() {
if len(errmsg) > 0 {
metrics.GetChannelFailEvent(
metrics.EVENT_SUBCATEGORY_CHANNEL_WS,
"errmsg", errmsg,
"type", ChannelTypeStr(c.ChannelType),
).ReportEvent()
}
}()
if c.consecutiveConnectFailed >= wssCoolDownCount {
if time.Now().Before(c.calmDownUntil) {
return errors.New("ws channel is calming down")
}
c.consecutiveConnectFailed = 0
log.GetLogger().Info("ws channel is not calm anymore")
}
host := util.GetServerHost()
if host == "" {
errmsg = "No available host"
return errors.New("No available host")
}
url := "wss://" + host + WEBSOCKET_SERVER
logger := log.GetLogger().WithField("url", url)
header := http.Header{
httpbase.UserAgentHeader: []string{httpbase.UserAgentValue},
}
if extraHeaders, err := requester.GetExtraHTTPHeaders(logger); extraHeaders != nil {
for k, v := range extraHeaders {
header.Add(k, v)
}
} else if err != nil {
logger.WithError(err).Error("Failed to construct extra HTTP headers")
}
var MyDialer = &websocket.Dialer{
Proxy: requester.GetProxyFunc(logger),
HandshakeTimeout: 45 * time.Second,
TLSClientConfig: &tls.Config{
RootCAs: requester.GetRootCAs(logger),
},
}
var dialErr error
var conn *websocket.Conn
conn, _, dialErr = MyDialer.Dial(url, header)
if dialErr != nil {
var certificateErr *tls.CertificateVerificationError
if errors.As(dialErr, &certificateErr) {
logger.WithError(dialErr).Error("certificate error, reload certificate and retry")
requester.AccumulateRootCAs(logger)(func(certPool *x509.CertPool) bool {
MyDialer.TLSClientConfig.RootCAs = certPool
if conn, _, dialErr = MyDialer.Dial(url, header); dialErr != nil {
errmsg = fmt.Sprintf("dial ws channel errror:%s, url=%s", dialErr.Error(), url)
return true
} else {
requester.UpdateRootCAs(logger, certPool)
logger.Info("certificate updated")
return false
}
})
} else {
errmsg = fmt.Sprintf("dial ws channel errror:%s, url=%s", dialErr.Error(), url)
}
}
if dialErr != nil {
c.consecutiveConnectFailed += 1
if c.consecutiveConnectFailed >= wssCoolDownCount {
c.calmDownUntil = time.Now().Add(time.Second * time.Duration(wssCoolDownTime))
errmsg = fmt.Sprintf("dial ws channel errror:%s, url=%s, wss dial failed %d times "+
"consecutivly, need calm down %d second",
dialErr.Error(), url, c.consecutiveConnectFailed, wssCoolDownTime)
} else {
errmsg = fmt.Sprintf("dial ws channel errror:%s, url=%s", dialErr.Error(), url)
}
log.GetLogger().Errorln(dialErr)
return dialErr
}
c.consecutiveConnectFailed = 0
c.wskConn = conn
logger.Infoln("Start websocket channel ok! url:", url)
c.Working.Set()
c.StartPings(time.Second * 60)
go func() {
defer func() {
if msg := recover(); msg != nil {
logger.Errorf("WebsocketChannel run panic: %v", msg)
logger.Errorf("%s: %s", msg, debug.Stack())
}
}()
retryCount := 0
for {
if !c.Working.IsSet() {
logger.Infoln("websocket channel is closed")
break
}
messageType, message, err := c.wskConn.ReadMessage()
if err != nil {
time.Sleep(time.Duration(1) * time.Second)
retryCount++
if retryCount >= MAX_RETRY_COUNT {
c.lock.Lock()
defer c.lock.Unlock()
c.wskConn.Close()
c.Working.Clear()
logger.Errorf("Reach the retry limit for receive messages. Error: %v", err.Error())
report := clientreport.ClientReport{
ReportType: "switch_channel_in_wsk",
Info: fmt.Sprintf("start:" + err.Error()),
}
clientreport.SendReport(report)
go c.SwitchChannel()
break
}
logger.Errorf(
"An error happened when receiving the message. Retried times: %d, MessageType: %v, Error: %s",
retryCount,
messageType,
err.Error())
} else if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
logger.Errorf("Invalid message type %d. ", messageType)
} else {
logger.Infof("wsk recv: %s", string(message))
content := c.CallBack(string(message), ChannelWebsocketType)
if content != "" {
c.writeLock.Lock()
err := c.wskConn.WriteMessage(websocket.TextMessage, []byte(content))
c.writeLock.Unlock()
if err != nil {
metrics.GetChannelFailEvent(
metrics.EVENT_SUBCATEGORY_CHANNEL_WS,
"errormsg", fmt.Sprintf("websocket writing err:%s, content=%s", err.Error(), content),
"type", ChannelTypeStr(c.ChannelType),
).ReportEvent()
}
}
retryCount = 0
}
}
}()
return nil
}