in agent/session/plugin/client/client.go [351:427]
func (c *Client) readLoop(wg *sync.WaitGroup) int {
defer wg.Done()
fname := "readLoop"
type MessageNonBlocking struct {
Msg message.Message
Err error
}
msgChan := make(chan MessageNonBlocking)
for {
go func() {
defer func() {
if r := recover(); r != nil {
logrus.Debug("readLoop returned so msgChan closed", r)
}
}()
_, data, err := c.Conn.ReadMessage()
if c.verbosemode {
log.GetLogger().Infoln("read msg: ", string(data))
}
streamDataMessage := message.Message{}
if err == nil {
if err = streamDataMessage.Deserialize(data); err != nil {
log.GetLogger().Errorf("Cannot deserialize raw message, err: %v.", err)
}
} else {
log.GetLogger().Errorln("read msg err")
openPoison(fname, c.poison)
}
if c.verbosemode {
log.GetLogger().Infoln("read msg num : ", streamDataMessage.SequenceNumber)
}
msgChan <- MessageNonBlocking{Msg: streamDataMessage, Err: err}
// time.Sleep(time.Second * 1)
// msgChan <- MessageNonBlocking{Data: []byte("c"), Err: nil}
}()
select {
case <-c.poison:
close(msgChan)
return die(fname, c.poison)
case msg := <-msgChan:
if msg.Err != nil {
log.GetLogger().Errorln("read msg err", msg.Err)
if _, ok := msg.Err.(*websocket.CloseError); !ok {
log.GetLogger().Warnf("c.Conn.ReadMessage: %v", msg.Err)
}
return openPoison(fname, c.poison)
}
if msg.Msg.Validate() != nil {
log.GetLogger().Errorln("An error has occured, msg is invalid")
return openPoison(fname, c.poison)
}
switch msg.Msg.MessageType {
case message.OutputStreamDataMessage: // data
c.real_connected = true
c.Output.Write(msg.Msg.Payload)
c.lastDataTimestampOffset.Store(int32(time.Since(c.startTimestamp).Seconds()))
break
case message.StatusDataChannel: // data
if c.ProcessStatusDataChannel(msg.Msg.Payload) != nil {
return openPoison(fname, c.poison)
}
break
default:
// logrus.Warnf("Unhandled protocol message")
}
}
}
return 0
}