in agent/session/shell/shell.go [226:280]
func (p *ShellPlugin) InputStreamMessageHandler(streamDataMessage message.Message) error {
if p.stdin == nil || p.stdout == nil {
// This is to handle scenario when cli/console starts sending size data but pty has not been started yet
// Since packets are rejected, cli/console will resend these packets until pty starts successfully in separate thread
p.logger.Error("Pty unavailable. Reject incoming message packet")
return nil
}
switch streamDataMessage.MessageType {
case message.InputStreamDataMessage:
// log.GetLogger().Traceln("Input message received: ", streamDataMessage.Payload)
if err := p.onInputStreamData(streamDataMessage.Payload); err != nil {
return err
}
case message.SetSizeDataMessage:
var size SizeData
if err := json.Unmarshal(streamDataMessage.Payload, &size); err != nil {
p.logger.Errorf("Invalid size message: %s", err)
return err
}
// log.GetLogger().Tracef("Resize data received: cols: %d, rows: %d", size.Cols, size.Rows)
if err := p.SetSize(size.Cols, size.Rows); err != nil {
p.logger.Errorf("Unable to set pty size: %s", err)
return err
}
case message.StatusDataMessage:
if len(streamDataMessage.Payload) > 0 {
code, err := message.BytesToIntU(streamDataMessage.Payload[0:1])
if err == nil {
switch code {
case 7: // 设置agent的发送速率
speed, err := message.BytesToIntU(streamDataMessage.Payload[1:]) // speed 单位是 bps
if speed == 0 {
break
}
if err != nil {
p.logger.Errorf("Invalid flowLimit: %s", err)
return err
}
p.sendInterval = 1000 / (speed / 8 / sendPackageSize)
p.logger.Infof("Set send speed, speed[%d]bps sendInterval[%d]ms\n", speed, p.sendInterval)
case 5:
p.logger.Info("Exit due to receiving a packet with a close status")
p.exitFunc(errors.New(Ok))
}
} else {
p.logger.Errorf("Parse status code err: %s", err)
}
}
case message.CloseDataChannel:
p.logger.Info("Exit due to receiving CloseDataChannel packet")
p.exitFunc(errors.New(Ok))
}
return nil
}