in src/System.ServiceModel.Primitives/src/System/ServiceModel/Channels/ClientReliableDuplexSessionChannel.cs [237:612]
protected async Task ProcessDuplexMessageAsync(WsrmMessageInfo info)
{
bool closeMessage = true;
try
{
bool wsrmFeb2005 = Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
bool wsrm11 = Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
bool final = false;
if (OutputConnection != null && info.AcknowledgementInfo != null)
{
final = wsrm11 && info.AcknowledgementInfo.Final;
int bufferRemaining = -1;
if (Settings.FlowControlEnabled)
bufferRemaining = info.AcknowledgementInfo.BufferRemaining;
OutputConnection.ProcessTransferred(info.AcknowledgementInfo.Ranges, bufferRemaining);
}
OnRemoteActivity();
bool tryAckNow = (info.AckRequestedInfo != null);
bool forceAck = false;
bool terminate = false;
bool scheduleShutdown = false;
ulong oldAckVersion = 0;
WsrmFault fault = null;
Message message = null;
Exception remoteFaultException = null;
if (info.SequencedMessageInfo != null)
{
bool needDispatch = false;
using (ThisAsyncLock.TakeLock())
{
if (Aborted || State == CommunicationState.Faulted)
{
return;
}
long sequenceNumber = info.SequencedMessageInfo.SequenceNumber;
bool isLast = wsrmFeb2005 && info.SequencedMessageInfo.LastMessage;
if (!_inputConnection.IsValid(sequenceNumber, isLast))
{
if (wsrmFeb2005)
{
fault = new LastMessageNumberExceededFault(ReliableSession.InputID);
}
else
{
message = new SequenceClosedFault(ReliableSession.InputID).CreateMessage(
Settings.MessageVersion, Settings.ReliableMessagingVersion);
forceAck = true;
OnMessageDropped();
}
}
else if (_inputConnection.Ranges.Contains(sequenceNumber))
{
OnMessageDropped();
tryAckNow = true;
}
else if (wsrmFeb2005 && info.Action == WsrmFeb2005Strings.LastMessageAction)
{
_inputConnection.Merge(sequenceNumber, isLast);
if (_inputConnection.AllAdded)
{
scheduleShutdown = true;
if (OutputConnection.CheckForTermination())
{
ReliableSession.CloseSession();
}
}
}
else if (State == CommunicationState.Closing)
{
if (wsrmFeb2005)
{
fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
SRP.SequenceTerminatedSessionClosedBeforeDone,
SRP.SessionClosedBeforeDone);
}
else
{
message = new SequenceClosedFault(ReliableSession.InputID).CreateMessage(
Settings.MessageVersion, Settings.ReliableMessagingVersion);
forceAck = true;
OnMessageDropped();
}
}
// In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
// serialized ack size and the amount of memory taken by the ack ranges. In the
// ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
// threat.
else if (_deliveryStrategy.CanEnqueue(sequenceNumber)
&& (Settings.Ordered || _inputConnection.CanMerge(sequenceNumber)))
{
_inputConnection.Merge(sequenceNumber, isLast);
needDispatch = _deliveryStrategy.Enqueue(info.Message, sequenceNumber);
closeMessage = false;
oldAckVersion = _ackVersion;
_pendingAcknowledgements++;
if (_inputConnection.AllAdded)
{
scheduleShutdown = true;
if (OutputConnection.CheckForTermination())
{
ReliableSession.CloseSession();
}
}
}
else
{
OnMessageDropped();
}
// if (ack now && we enqueued && an ack has been sent since we enqueued (and thus
// carries the sequence number of the message we just processed)) then we don't
// need to ack again.
if (_inputConnection.IsLastKnown || _pendingAcknowledgements == Settings.MaxTransferWindowSize)
tryAckNow = true;
bool startTimer = tryAckNow || (_pendingAcknowledgements > 0 && fault == null);
if (startTimer && !_acknowledgementScheduled)
{
_acknowledgementScheduled = true;
_acknowledgementTimer.Set(Settings.AcknowledgementInterval);
}
}
if (needDispatch)
{
Dispatch();
}
}
else if (wsrmFeb2005 && info.TerminateSequenceInfo != null)
{
bool isTerminateEarly;
using (ThisAsyncLock.TakeLock())
{
isTerminateEarly = !_inputConnection.Terminate();
}
if (isTerminateEarly)
{
fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
SRP.SequenceTerminatedEarlyTerminateSequence,
SRP.EarlyTerminateSequence);
}
}
else if (wsrm11)
{
if (((info.TerminateSequenceInfo != null) && (info.TerminateSequenceInfo.Identifier == ReliableSession.InputID))
|| (info.CloseSequenceInfo != null))
{
bool isTerminate = info.TerminateSequenceInfo != null;
WsrmRequestInfo requestInfo = isTerminate
? (WsrmRequestInfo)info.TerminateSequenceInfo
: (WsrmRequestInfo)info.CloseSequenceInfo;
long last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
if (!WsrmUtilities.ValidateWsrmRequest(ReliableSession, requestInfo, Binder, null))
{
return;
}
bool isLastLargeEnough = true;
bool isLastConsistent = true;
using (ThisAsyncLock.TakeLock())
{
if (!_inputConnection.IsLastKnown)
{
if (isTerminate)
{
if (_inputConnection.SetTerminateSequenceLast(last, out isLastLargeEnough))
{
scheduleShutdown = true;
}
else if (isLastLargeEnough)
{
remoteFaultException = new ProtocolException(SRP.EarlyTerminateSequence);
}
}
else
{
scheduleShutdown = _inputConnection.SetCloseSequenceLast(last);
isLastLargeEnough = scheduleShutdown;
}
if (scheduleShutdown)
{
ReliableSession.SetFinalAck(_inputConnection.Ranges);
if (_terminateRequestor != null)
{
ReliableSession.CloseSession();
}
_deliveryStrategy.Dispose();
}
}
else
{
isLastConsistent = (last == _inputConnection.Last);
// Have seen CloseSequence already, TerminateSequence means cleanup.
if (isTerminate && isLastConsistent && _inputConnection.IsSequenceClosed)
{
terminate = true;
}
}
}
if (!isLastLargeEnough)
{
string faultString = SRP.SequenceTerminatedSmallLastMsgNumber;
string exceptionString = SRP.SmallLastMsgNumberExceptionString;
fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID, faultString, exceptionString);
}
else if (!isLastConsistent)
{
string faultString = SRP.SequenceTerminatedInconsistentLastMsgNumber;
string exceptionString = SRP.InconsistentLastMsgNumberExceptionString;
fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID, faultString, exceptionString);
}
else
{
message = isTerminate
? WsrmUtilities.CreateTerminateResponseMessage(Settings.MessageVersion,
requestInfo.MessageId, ReliableSession.InputID)
: WsrmUtilities.CreateCloseSequenceResponse(Settings.MessageVersion,
requestInfo.MessageId, ReliableSession.InputID);
forceAck = true;
}
}
else if (info.TerminateSequenceInfo != null) // Identifier == OutputID
{
fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.InputID,
SRP.SequenceTerminatedUnsupportedTerminateSequence,
SRP.UnsupportedTerminateSequenceExceptionString);
}
else if (info.TerminateSequenceResponseInfo != null)
{
fault = ProcessCloseOrTerminateSequenceResponse(false, info);
}
else if (info.CloseSequenceResponseInfo != null)
{
fault = ProcessCloseOrTerminateSequenceResponse(true, info);
}
else if (final)
{
if (_closeRequestor == null)
{
string exceptionString = SRP.UnsupportedCloseExceptionString;
string faultString = SRP.SequenceTerminatedUnsupportedClose;
fault = SequenceTerminatedFault.CreateProtocolFault(ReliableSession.OutputID, faultString,
exceptionString);
}
else
{
fault = WsrmUtilities.ValidateFinalAck(ReliableSession, info, OutputConnection.Last);
if (fault == null)
{
_closeRequestor.SetInfo(info);
}
}
}
else if (info.WsrmHeaderFault != null)
{
if (!(info.WsrmHeaderFault is UnknownSequenceFault))
{
throw Fx.AssertAndThrow("Fault must be UnknownSequence fault.");
}
if (_terminateRequestor == null)
{
throw Fx.AssertAndThrow("In wsrm11, if we start getting UnknownSequence, terminateRequestor cannot be null.");
}
_terminateRequestor.SetInfo(info);
}
}
if (fault != null)
{
ReliableSession.OnLocalFault(fault.CreateException(), fault, null);
return;
}
if (scheduleShutdown)
{
ActionItem.Schedule(ShutdownCallback, null);
}
if (message != null)
{
if (forceAck)
{
WsrmUtilities.AddAcknowledgementHeader(Settings.ReliableMessagingVersion, message,
ReliableSession.InputID, _inputConnection.Ranges, true, GetBufferRemaining());
}
else if (tryAckNow)
{
AddPendingAcknowledgements(message);
}
}
else if (tryAckNow)
{
using (ThisAsyncLock.TakeLock())
{
if (oldAckVersion != 0 && oldAckVersion != _ackVersion)
return;
if (_acknowledgementScheduled)
{
_acknowledgementTimer.Cancel();
_acknowledgementScheduled = false;
}
_pendingAcknowledgements = 0;
}
message = CreateAcknowledgmentMessage();
}
if (message != null)
{
using (message)
{
if (_guard.Enter())
{
try
{
await Binder.SendAsync(message, DefaultSendTimeout);
}
finally
{
_guard.Exit();
}
}
}
}
if (terminate)
{
using (ThisAsyncLock.TakeLock())
{
_inputConnection.Terminate();
}
}
if (remoteFaultException != null)
{
ReliableSession.OnRemoteFault(remoteFaultException);
}
}
finally
{
if (closeMessage)
{
info.Message.Close();
}
}
}