in src/datachannel/streaming.go [722:758]
func (dataChannel *DataChannel) ProcessIncomingMessageBufferItems(log log.T,
outputMessage message.ClientMessage) (err error) {
for {
bufferedStreamMessage := dataChannel.IncomingMessageBuffer.Messages[dataChannel.ExpectedSequenceNumber]
if bufferedStreamMessage.Content != nil {
log.Debugf("Process stream data message from IncomingMessageBuffer. "+
"Sequence Number: %d", bufferedStreamMessage.SequenceNumber)
if err := outputMessage.DeserializeClientMessage(log, bufferedStreamMessage.Content); err != nil {
log.Errorf("Cannot deserialize raw message with err: %v.", err)
return err
}
// Decrypt if encryption is enabled and payload type is output
if dataChannel.encryptionEnabled &&
(outputMessage.PayloadType == uint32(message.Output) ||
outputMessage.PayloadType == uint32(message.StdErr) ||
outputMessage.PayloadType == uint32(message.ExitCode)) {
outputMessage.Payload, err = dataChannel.encryption.Decrypt(log, outputMessage.Payload)
if err != nil {
log.Errorf("Unable to decrypt buffered message data payload, MessageType %s, "+
"PayloadType %d, err: %s.", outputMessage.MessageType, outputMessage.PayloadType, err)
return err
}
}
dataChannel.processOutputMessageWithHandlers(log, outputMessage)
dataChannel.ExpectedSequenceNumber = dataChannel.ExpectedSequenceNumber + 1
dataChannel.RemoveDataFromIncomingMessageBuffer(bufferedStreamMessage.SequenceNumber)
} else {
break
}
}
return
}