in agent/session/port/port.go [218:286]
func (p *PortPlugin) InputStreamMessageHandler(streamDataMessage message.Message) error {
if p.conn == nil {
p.logger.Infof("InputStreamMessageHandler: connect not ready, wait %d seconds...", waitLocalConnTimeoutSecond)
timer := time.NewTimer(time.Duration(waitLocalConnTimeoutSecond) * time.Second)
select {
case <-p.connReady:
p.logger.Infoln("InputStreamMessageHandler: connect is ready")
case <-timer.C:
p.logger.Infof("InputStreamMessageHandler: connect still not ready after %d seconds", waitLocalConnTimeoutSecond)
return fmt.Errorf("connection with target host port not ready")
}
}
switch streamDataMessage.MessageType {
case message.InputStreamDataMessage:
// Reconnect only when receive data message
if p.needReconnect.CompareAndSwap(true, false) {
targetHostPort := net.JoinHostPort(p.targetHost, strconv.Itoa(p.portNumber))
p.logger.Infof("InputStreamMessageHandler:Reconnect to %s", targetHostPort)
var err error
p.conn, err = net.Dial("tcp", targetHostPort)
if err != nil {
p.logger.WithError(err).Error("Reconnect failed")
p.exitFunc(errors.New(Read_port_failed))
return err
} else {
p.logger.Info("Reconnect succeed")
p.reconnectSign <- struct{}{}
}
}
if _, err := p.conn.Write(streamDataMessage.Payload); err != nil {
p.logger.Errorf("Unable to write to port, err: %v.", err)
return err
}
if util.IsVerboseMode() {
p.logger.Infoln("write data:", string(streamDataMessage.Payload))
}
case message.StatusDataMessage:
p.logger.Info("message type: ", streamDataMessage.MessageType)
if len(streamDataMessage.Payload) > 0 {
code, err := message.BytesToIntU(streamDataMessage.Payload[0:1])
p.logger.WithError(err).Info("message status code: ", code)
if err == nil {
switch code {
case 7: // 设置agent的发送速率
speed, err := message.BytesToIntU(streamDataMessage.Payload[1:]) // speed 单位是 bps
if speed == 0 {
break
}
if err != nil {
p.logger.Errorf("Invalid flowLimit: %s", err)
return err
}
p.sendInterval = 1000 / (speed / 8 / sendPackageSize)
p.logger.Infof("Set send speed, channelId[%s] speed[%d]bps sendInterval[%d]ms\n", p.id, speed, p.sendInterval)
case 5:
p.logger.Info("Exit due to receiving a packet with a close status")
p.exitFunc(errors.New(Ok))
}
} else {
p.logger.Errorf("Parse status code err: %s", err)
}
}
case message.CloseDataChannel:
p.logger.Info("Exit due to receiving CloseDataChannel packet")
p.exitFunc(errors.New(Ok))
}
return nil
}