func()

in agent/channel/channel_websocket.go [57:206]


func (c *WebSocketChannel) StartChannel() error {
	c.lock.Lock()
	defer c.lock.Unlock()
	errmsg := ""
	defer func() {
		if len(errmsg) > 0 {
			metrics.GetChannelFailEvent(
				metrics.EVENT_SUBCATEGORY_CHANNEL_WS,
				"errmsg", errmsg,
				"type", ChannelTypeStr(c.ChannelType),
			).ReportEvent()
		}
	}()
	if c.consecutiveConnectFailed >= wssCoolDownCount {
		if time.Now().Before(c.calmDownUntil) {
			return errors.New("ws channel is calming down")
		}
		c.consecutiveConnectFailed = 0
		log.GetLogger().Info("ws channel is not calm anymore")
	}
	host := util.GetServerHost()
	if host == "" {
		errmsg = "No available host"
		return errors.New("No available host")
	}

	url := "wss://" + host + WEBSOCKET_SERVER

	logger := log.GetLogger().WithField("url", url)
	header := http.Header{
		httpbase.UserAgentHeader: []string{httpbase.UserAgentValue},
	}
	if extraHeaders, err := requester.GetExtraHTTPHeaders(logger); extraHeaders != nil {
		for k, v := range extraHeaders {
			header.Add(k, v)
		}
	} else if err != nil {
		logger.WithError(err).Error("Failed to construct extra HTTP headers")
	}

	var MyDialer = &websocket.Dialer{
		Proxy:            requester.GetProxyFunc(logger),
		HandshakeTimeout: 45 * time.Second,
		TLSClientConfig: &tls.Config{
			RootCAs: requester.GetRootCAs(logger),
		},
	}
	var dialErr error
	var conn *websocket.Conn
	conn, _, dialErr = MyDialer.Dial(url, header)
	if dialErr != nil {
		var certificateErr *tls.CertificateVerificationError
		if errors.As(dialErr, &certificateErr) {
			logger.WithError(dialErr).Error("certificate error, reload certificate and retry")

			requester.AccumulateRootCAs(logger)(func(certPool *x509.CertPool) bool {
				MyDialer.TLSClientConfig.RootCAs = certPool
				if conn, _, dialErr = MyDialer.Dial(url, header); dialErr != nil {
					errmsg = fmt.Sprintf("dial ws channel errror:%s, url=%s", dialErr.Error(), url)
					return true
				} else {
					requester.UpdateRootCAs(logger, certPool)
					logger.Info("certificate updated")
					return false
				}
			})
		} else {
			errmsg = fmt.Sprintf("dial ws channel errror:%s, url=%s", dialErr.Error(), url)
		}
	}
	if dialErr != nil {
		c.consecutiveConnectFailed += 1
		if c.consecutiveConnectFailed >= wssCoolDownCount {
			c.calmDownUntil = time.Now().Add(time.Second * time.Duration(wssCoolDownTime))
			errmsg = fmt.Sprintf("dial ws channel errror:%s, url=%s, wss dial failed %d times "+
				"consecutivly, need calm down %d second",
				dialErr.Error(), url, c.consecutiveConnectFailed, wssCoolDownTime)
		} else {
			errmsg = fmt.Sprintf("dial ws channel errror:%s, url=%s", dialErr.Error(), url)
		}
		log.GetLogger().Errorln(dialErr)
		return dialErr
	}
	c.consecutiveConnectFailed = 0
	c.wskConn = conn
	logger.Infoln("Start websocket channel ok! url:", url)
	c.Working.Set()
	c.StartPings(time.Second * 60)
	go func() {
		defer func() {
			if msg := recover(); msg != nil {
				logger.Errorf("WebsocketChannel  run panic: %v", msg)
				logger.Errorf("%s: %s", msg, debug.Stack())
			}
		}()
		retryCount := 0
		for {
			if !c.Working.IsSet() {
				logger.Infoln("websocket channel is closed")
				break
			}
			messageType, message, err := c.wskConn.ReadMessage()
			if err != nil {
				time.Sleep(time.Duration(1) * time.Second)
				retryCount++
				if retryCount >= MAX_RETRY_COUNT {
					c.lock.Lock()
					defer c.lock.Unlock()
					c.wskConn.Close()
					c.Working.Clear()
					logger.Errorf("Reach the retry limit for receive messages. Error: %v", err.Error())
					report := clientreport.ClientReport{
						ReportType: "switch_channel_in_wsk",
						Info:       fmt.Sprintf("start:" + err.Error()),
					}
					clientreport.SendReport(report)
					go c.SwitchChannel()
					break
				}
				logger.Errorf(
					"An error happened when receiving the message. Retried times: %d, MessageType: %v, Error: %s",
					retryCount,
					messageType,
					err.Error())
			} else if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
				logger.Errorf("Invalid message type %d. ", messageType)

			} else {
				logger.Infof("wsk recv: %s", string(message))

				content := c.CallBack(string(message), ChannelWebsocketType)
				if content != "" {
					c.writeLock.Lock()
					err := c.wskConn.WriteMessage(websocket.TextMessage, []byte(content))
					c.writeLock.Unlock()
					if err != nil {
						metrics.GetChannelFailEvent(
							metrics.EVENT_SUBCATEGORY_CHANNEL_WS,
							"errormsg", fmt.Sprintf("websocket writing err:%s, content=%s", err.Error(), content),
							"type", ChannelTypeStr(c.ChannelType),
						).ReportEvent()
					}
				}

				retryCount = 0
			}
		}
	}()
	return nil
}