protected async Task ProcessDuplexMessageAsync()

in src/System.ServiceModel.Primitives/src/System/ServiceModel/Channels/ClientReliableDuplexSessionChannel.cs [237:612]


        protected async Task ProcessDuplexMessageAsync(WsrmMessageInfo info)
        {
            bool closeMessage = true;

            try
            {
                bool wsrmFeb2005 = Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
                bool wsrm11 = Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
                bool final = false;

                if (OutputConnection != null && info.AcknowledgementInfo != null)
                {
                    final = wsrm11 && info.AcknowledgementInfo.Final;

                    int bufferRemaining = -1;

                    if (Settings.FlowControlEnabled)
                        bufferRemaining = info.AcknowledgementInfo.BufferRemaining;

                    OutputConnection.ProcessTransferred(info.AcknowledgementInfo.Ranges, bufferRemaining);
                }

                OnRemoteActivity();

                bool tryAckNow = (info.AckRequestedInfo != null);
                bool forceAck = false;
                bool terminate = false;
                bool scheduleShutdown = false;
                ulong oldAckVersion = 0;
                WsrmFault fault = null;
                Message message = null;
                Exception remoteFaultException = null;

                if (info.SequencedMessageInfo != null)
                {
                    bool needDispatch = false;

                    using (ThisAsyncLock.TakeLock())
                    {
                        if (Aborted || State == CommunicationState.Faulted)
                        {
                            return;
                        }

                        long sequenceNumber = info.SequencedMessageInfo.SequenceNumber;
                        bool isLast = wsrmFeb2005 && info.SequencedMessageInfo.LastMessage;

                        if (!_inputConnection.IsValid(sequenceNumber, isLast))
                        {
                            if (wsrmFeb2005)
                            {
                                fault = new LastMessageNumberExceededFault(ReliableSession.InputID);
                            }
                            else
                            {
                                message = new SequenceClosedFault(ReliableSession.InputID).CreateMessage(
                                    Settings.MessageVersion, Settings.ReliableMessagingVersion);
                                forceAck = true;

                                OnMessageDropped();
                            }
                        }
                        else if (_inputConnection.Ranges.Contains(sequenceNumber))
                        {
                            OnMessageDropped();
                            tryAckNow = true;
                        }
                        else if (wsrmFeb2005 && info.Action == WsrmFeb2005Strings.LastMessageAction)
                        {
                            _inputConnection.Merge(sequenceNumber, isLast);

                            if (_inputConnection.AllAdded)
                            {
                                scheduleShutdown = true;

                                if (OutputConnection.CheckForTermination())
                                {
                                    ReliableSession.CloseSession();
                                }
                            }
                        }
                        else if (State == CommunicationState.Closing)
                        {
                            if (wsrmFeb2005)
                            {
                                fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
                                    SRP.SequenceTerminatedSessionClosedBeforeDone,
                                    SRP.SessionClosedBeforeDone);
                            }
                            else
                            {
                                message = new SequenceClosedFault(ReliableSession.InputID).CreateMessage(
                                    Settings.MessageVersion, Settings.ReliableMessagingVersion);
                                forceAck = true;

                                OnMessageDropped();
                            }
                        }
                        // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
                        // serialized ack size and the amount of memory taken by the ack ranges. In the
                        // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
                        // threat.
                        else if (_deliveryStrategy.CanEnqueue(sequenceNumber)
                            && (Settings.Ordered || _inputConnection.CanMerge(sequenceNumber)))
                        {
                            _inputConnection.Merge(sequenceNumber, isLast);
                            needDispatch = _deliveryStrategy.Enqueue(info.Message, sequenceNumber);
                            closeMessage = false;
                            oldAckVersion = _ackVersion;
                            _pendingAcknowledgements++;

                            if (_inputConnection.AllAdded)
                            {
                                scheduleShutdown = true;

                                if (OutputConnection.CheckForTermination())
                                {
                                    ReliableSession.CloseSession();
                                }
                            }
                        }
                        else
                        {
                            OnMessageDropped();
                        }

                        // if (ack now && we enqueued && an ack has been sent since we enqueued (and thus
                        // carries the sequence number of the message we just processed)) then we don't
                        // need to ack again.
                        if (_inputConnection.IsLastKnown || _pendingAcknowledgements == Settings.MaxTransferWindowSize)
                            tryAckNow = true;

                        bool startTimer = tryAckNow || (_pendingAcknowledgements > 0 && fault == null);
                        if (startTimer && !_acknowledgementScheduled)
                        {
                            _acknowledgementScheduled = true;
                            _acknowledgementTimer.Set(Settings.AcknowledgementInterval);
                        }
                    }

                    if (needDispatch)
                    {
                        Dispatch();
                    }
                }
                else if (wsrmFeb2005 && info.TerminateSequenceInfo != null)
                {
                    bool isTerminateEarly;

                    using (ThisAsyncLock.TakeLock())
                    {
                        isTerminateEarly = !_inputConnection.Terminate();
                    }

                    if (isTerminateEarly)
                    {
                        fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
                            SRP.SequenceTerminatedEarlyTerminateSequence,
                            SRP.EarlyTerminateSequence);
                    }
                }
                else if (wsrm11)
                {
                    if (((info.TerminateSequenceInfo != null) && (info.TerminateSequenceInfo.Identifier == ReliableSession.InputID))
                        || (info.CloseSequenceInfo != null))
                    {
                        bool isTerminate = info.TerminateSequenceInfo != null;
                        WsrmRequestInfo requestInfo = isTerminate
                            ? (WsrmRequestInfo)info.TerminateSequenceInfo
                            : (WsrmRequestInfo)info.CloseSequenceInfo;
                        long last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;

                        if (!WsrmUtilities.ValidateWsrmRequest(ReliableSession, requestInfo, Binder, null))
                        {
                            return;
                        }

                        bool isLastLargeEnough = true;
                        bool isLastConsistent = true;

                        using (ThisAsyncLock.TakeLock())
                        {
                            if (!_inputConnection.IsLastKnown)
                            {
                                if (isTerminate)
                                {
                                    if (_inputConnection.SetTerminateSequenceLast(last, out isLastLargeEnough))
                                    {
                                        scheduleShutdown = true;
                                    }
                                    else if (isLastLargeEnough)
                                    {
                                        remoteFaultException = new ProtocolException(SRP.EarlyTerminateSequence);
                                    }
                                }
                                else
                                {
                                    scheduleShutdown = _inputConnection.SetCloseSequenceLast(last);
                                    isLastLargeEnough = scheduleShutdown;
                                }

                                if (scheduleShutdown)
                                {
                                    ReliableSession.SetFinalAck(_inputConnection.Ranges);
                                    if (_terminateRequestor != null)
                                    {
                                        ReliableSession.CloseSession();
                                    }

                                    _deliveryStrategy.Dispose();
                                }
                            }
                            else
                            {
                                isLastConsistent = (last == _inputConnection.Last);

                                // Have seen CloseSequence already, TerminateSequence means cleanup.
                                if (isTerminate && isLastConsistent && _inputConnection.IsSequenceClosed)
                                {
                                    terminate = true;
                                }
                            }
                        }

                        if (!isLastLargeEnough)
                        {
                            string faultString = SRP.SequenceTerminatedSmallLastMsgNumber;
                            string exceptionString = SRP.SmallLastMsgNumberExceptionString;
                            fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID, faultString, exceptionString);
                        }
                        else if (!isLastConsistent)
                        {
                            string faultString = SRP.SequenceTerminatedInconsistentLastMsgNumber;
                            string exceptionString = SRP.InconsistentLastMsgNumberExceptionString;
                            fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID, faultString, exceptionString);
                        }
                        else
                        {
                            message = isTerminate
                                ? WsrmUtilities.CreateTerminateResponseMessage(Settings.MessageVersion,
                                requestInfo.MessageId, ReliableSession.InputID)
                                : WsrmUtilities.CreateCloseSequenceResponse(Settings.MessageVersion,
                                requestInfo.MessageId, ReliableSession.InputID);
                            forceAck = true;
                        }
                    }
                    else if (info.TerminateSequenceInfo != null)    // Identifier == OutputID
                    {
                        fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
                            SRP.SequenceTerminatedUnsupportedTerminateSequence,
                            SRP.UnsupportedTerminateSequenceExceptionString);
                    }
                    else if (info.TerminateSequenceResponseInfo != null)
                    {
                        fault = ProcessCloseOrTerminateSequenceResponse(false, info);
                    }
                    else if (info.CloseSequenceResponseInfo != null)
                    {
                        fault = ProcessCloseOrTerminateSequenceResponse(true, info);
                    }
                    else if (final)
                    {
                        if (_closeRequestor == null)
                        {
                            string exceptionString = SRP.UnsupportedCloseExceptionString;
                            string faultString = SRP.SequenceTerminatedUnsupportedClose;

                            fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.OutputID, faultString,
                                exceptionString);
                        }
                        else
                        {
                            fault = WsrmUtilities.ValidateFinalAck(ReliableSession, info, OutputConnection.Last);

                            if (fault == null)
                            {
                                _closeRequestor.SetInfo(info);
                            }
                        }
                    }
                    else if (info.WsrmHeaderFault != null)
                    {
                        if (!(info.WsrmHeaderFault is UnknownSequenceFault))
                        {
                            throw Fx.AssertAndThrow("Fault must be UnknownSequence fault.");
                        }

                        if (_terminateRequestor == null)
                        {
                            throw Fx.AssertAndThrow("In wsrm11, if we start getting UnknownSequence, terminateRequestor cannot be null.");
                        }

                        _terminateRequestor.SetInfo(info);
                    }
                }

                if (fault != null)
                {
                    ReliableSession.OnLocalFault(fault.CreateException(), fault, null);
                    return;
                }

                if (scheduleShutdown)
                {
                    ActionItem.Schedule(ShutdownCallback, null);
                }

                if (message != null)
                {
                    if (forceAck)
                    {
                        WsrmUtilities.AddAcknowledgementHeader(Settings.ReliableMessagingVersion, message,
                            ReliableSession.InputID, _inputConnection.Ranges, true, GetBufferRemaining());
                    }
                    else if (tryAckNow)
                    {
                        AddPendingAcknowledgements(message);
                    }
                }
                else if (tryAckNow)
                {
                    using (ThisAsyncLock.TakeLock())
                    {
                        if (oldAckVersion != 0 && oldAckVersion != _ackVersion)
                            return;

                        if (_acknowledgementScheduled)
                        {
                            _acknowledgementTimer.Cancel();
                            _acknowledgementScheduled = false;
                        }
                        _pendingAcknowledgements = 0;
                    }

                    message = CreateAcknowledgmentMessage();
                }

                if (message != null)
                {
                    using (message)
                    {
                        if (_guard.Enter())
                        {
                            try
                            {
                                await Binder.SendAsync(message, DefaultSendTimeout);
                            }
                            finally
                            {
                                _guard.Exit();
                            }
                        }
                    }
                }

                if (terminate)
                {
                    using (ThisAsyncLock.TakeLock())
                    {
                        _inputConnection.Terminate();
                    }
                }

                if (remoteFaultException != null)
                {
                    ReliableSession.OnRemoteFault(remoteFaultException);
                }
            }
            finally
            {
                if (closeMessage)
                {
                    info.Message.Close();
                }
            }
        }