func()

in agent/session/port/port.go [218:286]


func (p *PortPlugin) InputStreamMessageHandler(streamDataMessage message.Message) error {
	if p.conn == nil {
		p.logger.Infof("InputStreamMessageHandler: connect not ready, wait %d seconds...", waitLocalConnTimeoutSecond)
		timer := time.NewTimer(time.Duration(waitLocalConnTimeoutSecond) * time.Second)
		select {
		case <-p.connReady:
			p.logger.Infoln("InputStreamMessageHandler: connect is ready")
		case <-timer.C:
			p.logger.Infof("InputStreamMessageHandler: connect still not ready after %d seconds", waitLocalConnTimeoutSecond)
			return fmt.Errorf("connection with target host port not ready")
		}
	}

	switch streamDataMessage.MessageType {
	case message.InputStreamDataMessage:
		// Reconnect only when receive data message
		if p.needReconnect.CompareAndSwap(true, false) {
			targetHostPort := net.JoinHostPort(p.targetHost, strconv.Itoa(p.portNumber))
			p.logger.Infof("InputStreamMessageHandler:Reconnect to %s", targetHostPort)
			var err error
			p.conn, err = net.Dial("tcp", targetHostPort)
			if err != nil {
				p.logger.WithError(err).Error("Reconnect failed")
				p.exitFunc(errors.New(Read_port_failed))
				return err
			} else {
				p.logger.Info("Reconnect succeed")
				p.reconnectSign <- struct{}{}
			}
		}
		if _, err := p.conn.Write(streamDataMessage.Payload); err != nil {
			p.logger.Errorf("Unable to write to port, err: %v.", err)
			return err
		}
		if util.IsVerboseMode() {
			p.logger.Infoln("write data:", string(streamDataMessage.Payload))
		}
	case message.StatusDataMessage:
		p.logger.Info("message type: ", streamDataMessage.MessageType)
		if len(streamDataMessage.Payload) > 0 {
			code, err := message.BytesToIntU(streamDataMessage.Payload[0:1])
			p.logger.WithError(err).Info("message status code: ", code)
			if err == nil {
				switch code {
				case 7: // 设置agent的发送速率
					speed, err := message.BytesToIntU(streamDataMessage.Payload[1:]) // speed 单位是 bps
					if speed == 0 {
						break
					}
					if err != nil {
						p.logger.Errorf("Invalid flowLimit: %s", err)
						return err
					}
					p.sendInterval = 1000 / (speed / 8 / sendPackageSize)
					p.logger.Infof("Set send speed, channelId[%s] speed[%d]bps sendInterval[%d]ms\n", p.id, speed, p.sendInterval)
				case 5:
					p.logger.Info("Exit due to receiving a packet with a close status")
					p.exitFunc(errors.New(Ok))
				}
			} else {
				p.logger.Errorf("Parse status code err: %s", err)
			}
		}
	case message.CloseDataChannel:
		p.logger.Info("Exit due to receiving CloseDataChannel packet")
		p.exitFunc(errors.New(Ok))
	}
	return nil
}