func()

in agent/session/shell/shell.go [226:280]


func (p *ShellPlugin) InputStreamMessageHandler(streamDataMessage message.Message) error {
	if p.stdin == nil || p.stdout == nil {
		// This is to handle scenario when cli/console starts sending size data but pty has not been started yet
		// Since packets are rejected, cli/console will resend these packets until pty starts successfully in separate thread
		p.logger.Error("Pty unavailable. Reject incoming message packet")
		return nil
	}

	switch streamDataMessage.MessageType {
	case message.InputStreamDataMessage:
		// log.GetLogger().Traceln("Input message received: ", streamDataMessage.Payload)
		if err := p.onInputStreamData(streamDataMessage.Payload); err != nil {
			return err
		}
	case message.SetSizeDataMessage:
		var size SizeData
		if err := json.Unmarshal(streamDataMessage.Payload, &size); err != nil {
			p.logger.Errorf("Invalid size message: %s", err)
			return err
		}
		// log.GetLogger().Tracef("Resize data received: cols: %d, rows: %d", size.Cols, size.Rows)
		if err := p.SetSize(size.Cols, size.Rows); err != nil {
			p.logger.Errorf("Unable to set pty size: %s", err)
			return err
		}
	case message.StatusDataMessage:
		if len(streamDataMessage.Payload) > 0 {
			code, err := message.BytesToIntU(streamDataMessage.Payload[0:1])
			if err == nil {
				switch code {
				case 7: // 设置agent的发送速率
					speed, err := message.BytesToIntU(streamDataMessage.Payload[1:]) // speed 单位是 bps
					if speed == 0 {
						break
					}
					if err != nil {
						p.logger.Errorf("Invalid flowLimit: %s", err)
						return err
					}
					p.sendInterval = 1000 / (speed / 8 / sendPackageSize)
					p.logger.Infof("Set send speed, speed[%d]bps sendInterval[%d]ms\n", speed, p.sendInterval)
				case 5:
					p.logger.Info("Exit due to receiving a packet with a close status")
					p.exitFunc(errors.New(Ok))
				}
			} else {
				p.logger.Errorf("Parse status code err: %s", err)
			}
		}
	case message.CloseDataChannel:
		p.logger.Info("Exit due to receiving CloseDataChannel packet")
		p.exitFunc(errors.New(Ok))
	}
	return nil
}