func()

in src/datachannel/streaming.go [722:758]


func (dataChannel *DataChannel) ProcessIncomingMessageBufferItems(log log.T,
	outputMessage message.ClientMessage) (err error) {

	for {
		bufferedStreamMessage := dataChannel.IncomingMessageBuffer.Messages[dataChannel.ExpectedSequenceNumber]
		if bufferedStreamMessage.Content != nil {
			log.Debugf("Process stream data message from IncomingMessageBuffer. "+
				"Sequence Number: %d", bufferedStreamMessage.SequenceNumber)

			if err := outputMessage.DeserializeClientMessage(log, bufferedStreamMessage.Content); err != nil {
				log.Errorf("Cannot deserialize raw message with err: %v.", err)
				return err
			}

			// Decrypt if encryption is enabled and payload type is output
			if dataChannel.encryptionEnabled &&
				(outputMessage.PayloadType == uint32(message.Output) ||
					outputMessage.PayloadType == uint32(message.StdErr) ||
					outputMessage.PayloadType == uint32(message.ExitCode)) {
				outputMessage.Payload, err = dataChannel.encryption.Decrypt(log, outputMessage.Payload)
				if err != nil {
					log.Errorf("Unable to decrypt buffered message data payload, MessageType %s, "+
						"PayloadType %d, err: %s.", outputMessage.MessageType, outputMessage.PayloadType, err)
					return err
				}
			}

			dataChannel.processOutputMessageWithHandlers(log, outputMessage)

			dataChannel.ExpectedSequenceNumber = dataChannel.ExpectedSequenceNumber + 1
			dataChannel.RemoveDataFromIncomingMessageBuffer(bufferedStreamMessage.SequenceNumber)
		} else {
			break
		}
	}
	return
}