func()

in agent/session/plugin/client/client.go [351:427]


func (c *Client) readLoop(wg *sync.WaitGroup) int {
	defer wg.Done()
	fname := "readLoop"

	type MessageNonBlocking struct {
		Msg message.Message
		Err error
	}
	msgChan := make(chan MessageNonBlocking)

	for {
		go func() {
			defer func() {
				if r := recover(); r != nil {
					logrus.Debug("readLoop returned so msgChan closed", r)
				}
			}()
			_, data, err := c.Conn.ReadMessage()
			if c.verbosemode {
				log.GetLogger().Infoln("read msg: ", string(data))
			}
			streamDataMessage := message.Message{}
			if err == nil {
				if err = streamDataMessage.Deserialize(data); err != nil {
					log.GetLogger().Errorf("Cannot deserialize raw message, err: %v.", err)
				}
			} else {
				log.GetLogger().Errorln("read msg err")
				openPoison(fname, c.poison)
			}

			if c.verbosemode {
				log.GetLogger().Infoln("read msg num : ", streamDataMessage.SequenceNumber)
			}

			msgChan <- MessageNonBlocking{Msg: streamDataMessage, Err: err}
			// time.Sleep(time.Second * 1)
			// msgChan <- MessageNonBlocking{Data:  []byte("c"), Err: nil}
		}()

		select {
		case <-c.poison:
			close(msgChan)
			return die(fname, c.poison)
		case msg := <-msgChan:
			if msg.Err != nil {
				log.GetLogger().Errorln("read msg err", msg.Err)
				if _, ok := msg.Err.(*websocket.CloseError); !ok {
					log.GetLogger().Warnf("c.Conn.ReadMessage: %v", msg.Err)
				}

				return openPoison(fname, c.poison)
			}
			if msg.Msg.Validate() != nil {

				log.GetLogger().Errorln("An error has occured, msg is invalid")
				return openPoison(fname, c.poison)
			}

			switch msg.Msg.MessageType {
			case message.OutputStreamDataMessage: // data
				c.real_connected = true
				c.Output.Write(msg.Msg.Payload)
				c.lastDataTimestampOffset.Store(int32(time.Since(c.startTimestamp).Seconds()))
				break
			case message.StatusDataChannel: // data
				if c.ProcessStatusDataChannel(msg.Msg.Payload) != nil {
					return openPoison(fname, c.poison)
				}
				break
			default:
				// logrus.Warnf("Unhandled protocol message")
			}
		}
	}
	return 0
}