src/Proton/Engine/Implementation/ProtonSender.cs (326 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Logging;
using Apache.Qpid.Proton.Types.Transport;
using Apache.Qpid.Proton.Utilities;
namespace Apache.Qpid.Proton.Engine.Implementation
{
/// <summary>
/// Proton Sender link implementation which manages the state of the Sender end
/// of an attached link and provides resources for sending deliveries and managing
/// the state of sent deliveries.
/// </summary>
public sealed class ProtonSender : ProtonLink<ISender>, ISender
{
private static readonly IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ProtonSender>();
private readonly ProtonSessionOutgoingWindow sessionWindow;
private readonly LinkedSplayedDictionary<uint, ProtonOutgoingDelivery> unsettled = new();
private Action<IOutgoingDelivery> deliveryUpdatedEventHandler = null;
private uint? currentDeliveryId = null;
private bool sendable;
private IDeliveryTagGenerator autoTagGenerator;
private IOutgoingDelivery current;
public ProtonSender(ProtonSession session, string name, ProtonLinkCreditState creditState) : base(session, name, creditState)
{
this.sessionWindow = session.OutgoingWindow;
}
#region Proton Sender API Implementation
public override Role Role => Role.Sender;
public override uint Credit => CreditState.Credit;
public bool IsSendable => sendable = sendable && sessionWindow.IsSendable;
public override bool IsDraining => CreditState.IsDrain;
public ISender Drained()
{
CheckLinkOperable("Cannot report link drained.");
ProtonLinkCreditState state = CreditState;
if (state.IsDrain && state.HasCredit)
{
uint drained = state.Credit;
state.ClearCredit();
state.IncrementDeliveryCount(drained);
session.WriteFlow(this);
state.ClearDrain();
}
return this;
}
public ISender Disposition(Predicate<IOutgoingDelivery> filter, IDeliveryState state, bool settle)
{
CheckLinkOperable("Cannot apply disposition");
if (filter == null)
{
throw new ArgumentNullException(nameof(filter), "Supplied filter cannot be null");
};
IList<uint> toRemove = new List<uint>(); ;
foreach (KeyValuePair<uint, ProtonOutgoingDelivery> entry in unsettled)
{
if (filter.Invoke(entry.Value))
{
if (state != null)
{
entry.Value.LocalState(state);
}
if (settle)
{
entry.Value.LocallySettled();
toRemove.Add(entry.Key);
}
sessionWindow.ProcessDisposition(this, entry.Value);
}
}
if (toRemove.Count > 0)
{
foreach (uint key in toRemove)
{
unsettled.Remove(key);
}
}
return this;
}
public ISender Settle(Predicate<IOutgoingDelivery> filter)
{
Disposition(filter, null, true);
return this;
}
public IOutgoingDelivery Current => current;
public IOutgoingDelivery Next()
{
CheckLinkOperable("Cannot update next delivery");
if (current != null)
{
throw new InvalidOperationException("Current delivery is not complete and cannot be advanced.");
}
else
{
current = new ProtonOutgoingDelivery(this);
if (autoTagGenerator != null)
{
current.DeliveryTag = autoTagGenerator.NextTag();
}
}
return current;
}
public bool HasUnsettled => unsettled.Count > 0;
public IReadOnlyCollection<IOutgoingDelivery> Unsettled => new List<IOutgoingDelivery>(unsettled.Values);
public IDeliveryTagGenerator DeliveryTagGenerator
{
get => autoTagGenerator;
set => autoTagGenerator = value;
}
public ISender DeliveryStateUpdatedHandler(Action<IOutgoingDelivery> handler)
{
deliveryUpdatedEventHandler = handler;
return this;
}
#endregion
#region Proton Link event and state change handlers
protected override void HandleRemoteAttach(Attach attach)
{
}
protected override void HandleRemoteDetach(Detach detach)
{
}
protected override void HandleRemoteDisposition(Disposition disposition, ProtonIncomingDelivery delivery)
{
throw new InvalidOperationException("Sender link should never handle dispositions for incoming deliveries");
}
protected override void HandleRemoteDisposition(Disposition disposition, ProtonOutgoingDelivery delivery)
{
bool updated = false;
if (disposition.State != null && !disposition.State.Equals(delivery.RemoteState))
{
updated = true;
delivery.RemoteState = disposition.State;
}
if (disposition.Settled && !delivery.IsRemotelySettled)
{
updated = true;
delivery.RemotelySettled();
}
if (updated)
{
FireDeliveryStateUpdated(delivery);
}
}
protected override void HandleRemoteFlow(Flow flow)
{
ProtonLinkCreditState creditState = CreditState;
creditState.RemoteFlow(flow);
uint existingDeliveryCount = creditState.DeliveryCount;
// int casts are expected, credit is a uint and delivery-count is really a uint sequence which wraps, so we
// just use the truncation and overflows. Receivers flow might not have any delivery-count, as sender initializes
// on attach! We initialize to 0 so we can just ignore that.
uint remoteDeliveryCount = flow.DeliveryCount;
uint newDeliveryCountLimit = remoteDeliveryCount + flow.LinkCredit;
uint effectiveCredit = newDeliveryCountLimit - existingDeliveryCount;
if (effectiveCredit > 0)
{
creditState.UpdateCredit(effectiveCredit);
}
else
{
creditState.UpdateCredit(0);
}
if (IsLocallyOpen)
{
sendable = Credit > 0 && sessionWindow.IsSendable;
FireCreditStateUpdated();
}
}
protected override void HandleRemoteTransfer(Transfer transfer, IProtonBuffer payload, out ProtonIncomingDelivery delivery)
{
throw new InvalidOperationException("Sender end cannot process incoming transfers");
}
protected override void HandleSessionCreditStateUpdates(in ProtonSessionOutgoingWindow window)
{
bool previousSendable = sendable;
sendable = Credit > 0 && sessionWindow.IsSendable;
if (previousSendable != sendable)
{
FireCreditStateUpdated();
}
}
protected override void HandleSessionCreditStateUpdates(in ProtonSessionIncomingWindow window)
{
}
protected override void HandleDecorateOfOutgoingFlow(Flow flow)
{
flow.LinkCredit = Credit;
flow.Handle = Handle;
flow.DeliveryCount = CreditState.DeliveryCount;
flow.Drain = IsDraining;
}
protected override void TransitionedToLocallyOpened()
{
localAttach.InitialDeliveryCount = currentDeliveryId ?? 0;
sendable = Credit > 0 && sessionWindow.IsSendable;
}
protected override void TransitionedToLocallyDetached()
{
sendable = false;
}
protected override void TransitionedToLocallyClosed()
{
sendable = false;
}
protected override void TransitionToRemotelyOpenedState()
{
sendable = false;
}
protected override void TransitionToRemotelyDetached()
{
sendable = false;
}
protected override void TransitionToRemotelyClosed()
{
sendable = false;
}
protected override void TransitionToParentLocallyClosed()
{
sendable = false;
}
protected override void TransitionToParentRemotelyClosed()
{
sendable = false;
}
#endregion
#region Internal Proton Sender API
internal override ProtonSender Self() => this;
internal bool HasDeliveryStateUpdateHandler => deliveryUpdatedEventHandler != null;
internal void FireDeliveryStateUpdated(ProtonOutgoingDelivery delivery)
{
if (delivery.HasDeliveryStateUpdatedHandler)
{
delivery.FireDeliveryStateUpdated();
}
else
{
deliveryUpdatedEventHandler?.Invoke(delivery);
}
}
internal override ISender FireRemoteOpen()
{
if (HasOpenHandler)
{
base.FireRemoteOpen();
}
else if (session.HasSenderOpenEventHandler)
{
session.FireRemoteSenderOpened(this);
}
else if (connection.HasSenderOpenEventHandler)
{
connection.FireRemoteSenderOpened(this);
}
else
{
LOG.Info("Sender opened but no event handler registered to inform: {0}", this);
}
return this;
}
internal void Send(ProtonOutgoingDelivery delivery, IProtonBuffer buffer, bool complete)
{
CheckLinkOperable("Cannot send when link has become inoperable");
if (IsSendable)
{
if (currentDeliveryId == null)
{
currentDeliveryId = sessionWindow.ClaimNextDeliveryId;
delivery.DeliveryId = (uint)currentDeliveryId;
}
if (!delivery.IsSettled && delivery.TransferCount == 0)
{
unsettled.Add(delivery.DeliveryId, delivery);
}
try
{
sendable = sessionWindow.ProcessSend(this, delivery, buffer, complete) && Credit > 0;
}
finally
{
if (complete && (buffer == null || !buffer.IsReadable))
{
delivery.MarkComplete();
currentDeliveryId = null;
current = null;
CreditState.IncrementDeliveryCount();
CreditState.DecrementCredit();
if (Credit == 0)
{
sendable = false;
CreditState.ClearDrain();
}
}
}
}
}
internal void Abort(ProtonOutgoingDelivery delivery)
{
CheckLinkOperable("Cannot abort Transfer");
try
{
if (delivery.TransferCount > 0)
{
sessionWindow.ProcessAbort(this, delivery);
}
}
finally
{
unsettled.Remove(delivery.DeliveryId);
currentDeliveryId = null;
current = null;
}
}
internal void Disposition(ProtonOutgoingDelivery delivery)
{
if (!delivery.IsRemotelySettled)
{
CheckLinkOperable("Cannot set a disposition");
}
try
{
sessionWindow.ProcessDisposition(this, delivery);
}
finally
{
if (delivery.IsSettled)
{
unsettled.Remove(delivery.DeliveryId);
}
}
}
#endregion
}
}