func()

in src/datachannel/streaming.go [608:717]


func (dataChannel *DataChannel) HandleOutputMessage(
	log log.T,
	outputMessage message.ClientMessage,
	rawMessage []byte) (err error) {

	// On receiving expected stream data message, send acknowledgement, process it and increment expected sequence number by 1.
	// Further process messages from IncomingMessageBuffer
	if outputMessage.SequenceNumber == dataChannel.ExpectedSequenceNumber {

		switch message.PayloadType(outputMessage.PayloadType) {
		case message.HandshakeRequestPayloadType:
			{
				if err = SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
					return err
				}

				// PayloadType is HandshakeRequest so we call our own handler instead of the provided handler
				log.Debugf("Processing HandshakeRequest message %s", outputMessage)
				if err = dataChannel.handleHandshakeRequest(log, outputMessage); err != nil {
					log.Errorf("Unable to process incoming data payload, MessageType %s, "+
						"PayloadType HandshakeRequestPayloadType, err: %s.", outputMessage.MessageType, err)
					return err
				}
			}
		case message.HandshakeCompletePayloadType:
			{
				if err = SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
					return err
				}

				if err = dataChannel.handleHandshakeComplete(log, outputMessage); err != nil {
					log.Errorf("Unable to process incoming data payload, MessageType %s, "+
						"PayloadType HandshakeCompletePayloadType, err: %s.", outputMessage.MessageType, err)
					return err
				}
			}
		case message.EncChallengeRequest:
			{
				if err = SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
					return err
				}

				if err = dataChannel.handleEncryptionChallengeRequest(log, outputMessage); err != nil {
					log.Errorf("Unable to process incoming data payload, MessageType %s, "+
						"PayloadType EncChallengeRequest, err: %s.", outputMessage.MessageType, err)
					return err
				}
			}
		default:

			log.Tracef("Process new incoming stream data message. Sequence Number: %d", outputMessage.SequenceNumber)

			// Decrypt if encryption is enabled and payload type is output
			if dataChannel.encryptionEnabled &&
				(outputMessage.PayloadType == uint32(message.Output) ||
					outputMessage.PayloadType == uint32(message.StdErr) ||
					outputMessage.PayloadType == uint32(message.ExitCode)) {
				outputMessage.Payload, err = dataChannel.encryption.Decrypt(log, outputMessage.Payload)
				if err != nil {
					log.Errorf("Unable to decrypt incoming data payload, MessageType %s, "+
						"PayloadType %d, err: %s.", outputMessage.MessageType, outputMessage.PayloadType, err)
					return err
				}
			}

			isHandlerReady, err := dataChannel.processOutputMessageWithHandlers(log, outputMessage)
			if err != nil {
				log.Error("Failed to process stream data message: %s", err.Error())
				return err
			}
			if !isHandlerReady {
				log.Warnf("Stream data message with sequence number %d is not processed as session handler is not ready.", outputMessage.SequenceNumber)
				return nil
			} else {
				// Acknowledge outputMessage only if session specific handler is ready
				if err := SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
					return err
				}
			}
		}
		dataChannel.ExpectedSequenceNumber = dataChannel.ExpectedSequenceNumber + 1
		return dataChannel.ProcessIncomingMessageBufferItems(log, outputMessage)
	} else {
		log.Debugf("Unexpected sequence message received. Received Sequence Number: %d. Expected Sequence Number: %d",
			outputMessage.SequenceNumber, dataChannel.ExpectedSequenceNumber)

		// If incoming message sequence number is greater then expected sequence number and IncomingMessageBuffer has capacity,
		// add message to IncomingMessageBuffer and send acknowledgement
		if outputMessage.SequenceNumber > dataChannel.ExpectedSequenceNumber {
			log.Debugf("Received Sequence Number %d is higher than Expected Sequence Number %d, adding to IncomingMessageBuffer",
				outputMessage.SequenceNumber, dataChannel.ExpectedSequenceNumber)
			if len(dataChannel.IncomingMessageBuffer.Messages) < dataChannel.IncomingMessageBuffer.Capacity {
				if err = SendAcknowledgeMessageCall(log, dataChannel, outputMessage); err != nil {
					return err
				}

				streamingMessage := StreamingMessage{
					rawMessage,
					outputMessage.SequenceNumber,
					time.Now(),
					new(int),
				}

				//Add message to buffer for future processing
				dataChannel.AddDataToIncomingMessageBuffer(streamingMessage)
			}
		}
	}
	return nil
}