in src/datachannel/streaming.go [608:717]
func (dataChannel *DataChannel) HandleOutputMessage(
log log.T,
outputMessage message.ClientMessage,
rawMessage []byte) (err error) {
// On receiving expected stream data message, send acknowledgement, process it and increment expected sequence number by 1.
// Further process messages from IncomingMessageBuffer
if outputMessage.SequenceNumber == dataChannel.ExpectedSequenceNumber {
switch message.PayloadType(outputMessage.PayloadType) {
case message.HandshakeRequestPayloadType:
{
if err = SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
return err
}
// PayloadType is HandshakeRequest so we call our own handler instead of the provided handler
log.Debugf("Processing HandshakeRequest message %s", outputMessage)
if err = dataChannel.handleHandshakeRequest(log, outputMessage); err != nil {
log.Errorf("Unable to process incoming data payload, MessageType %s, "+
"PayloadType HandshakeRequestPayloadType, err: %s.", outputMessage.MessageType, err)
return err
}
}
case message.HandshakeCompletePayloadType:
{
if err = SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
return err
}
if err = dataChannel.handleHandshakeComplete(log, outputMessage); err != nil {
log.Errorf("Unable to process incoming data payload, MessageType %s, "+
"PayloadType HandshakeCompletePayloadType, err: %s.", outputMessage.MessageType, err)
return err
}
}
case message.EncChallengeRequest:
{
if err = SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
return err
}
if err = dataChannel.handleEncryptionChallengeRequest(log, outputMessage); err != nil {
log.Errorf("Unable to process incoming data payload, MessageType %s, "+
"PayloadType EncChallengeRequest, err: %s.", outputMessage.MessageType, err)
return err
}
}
default:
log.Tracef("Process new incoming stream data message. Sequence Number: %d", outputMessage.SequenceNumber)
// Decrypt if encryption is enabled and payload type is output
if dataChannel.encryptionEnabled &&
(outputMessage.PayloadType == uint32(message.Output) ||
outputMessage.PayloadType == uint32(message.StdErr) ||
outputMessage.PayloadType == uint32(message.ExitCode)) {
outputMessage.Payload, err = dataChannel.encryption.Decrypt(log, outputMessage.Payload)
if err != nil {
log.Errorf("Unable to decrypt incoming data payload, MessageType %s, "+
"PayloadType %d, err: %s.", outputMessage.MessageType, outputMessage.PayloadType, err)
return err
}
}
isHandlerReady, err := dataChannel.processOutputMessageWithHandlers(log, outputMessage)
if err != nil {
log.Error("Failed to process stream data message: %s", err.Error())
return err
}
if !isHandlerReady {
log.Warnf("Stream data message with sequence number %d is not processed as session handler is not ready.", outputMessage.SequenceNumber)
return nil
} else {
// Acknowledge outputMessage only if session specific handler is ready
if err := SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
return err
}
}
}
dataChannel.ExpectedSequenceNumber = dataChannel.ExpectedSequenceNumber + 1
return dataChannel.ProcessIncomingMessageBufferItems(log, outputMessage)
} else {
log.Debugf("Unexpected sequence message received. Received Sequence Number: %d. Expected Sequence Number: %d",
outputMessage.SequenceNumber, dataChannel.ExpectedSequenceNumber)
// If incoming message sequence number is greater then expected sequence number and IncomingMessageBuffer has capacity,
// add message to IncomingMessageBuffer and send acknowledgement
if outputMessage.SequenceNumber > dataChannel.ExpectedSequenceNumber {
log.Debugf("Received Sequence Number %d is higher than Expected Sequence Number %d, adding to IncomingMessageBuffer",
outputMessage.SequenceNumber, dataChannel.ExpectedSequenceNumber)
if len(dataChannel.IncomingMessageBuffer.Messages) < dataChannel.IncomingMessageBuffer.Capacity {
if err = SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
return err
}
streamingMessage := StreamingMessage{
rawMessage,
outputMessage.SequenceNumber,
time.Now(),
new(int),
}
//Add message to buffer for future processing
dataChannel.AddDataToIncomingMessageBuffer(streamingMessage)
}
}
}
return nil
}