in src/Cbs/AmqpCbsLink.cs [182:241]
async Task<RequestResponseAmqpLink> CreateCbsLinkAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
string address = CbsConstants.CbsAddress;
AmqpSession session = null;
RequestResponseAmqpLink link = null;
ExceptionDispatchInfo lastException = null;
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
while (timeoutHelper.RemainingTime() > TimeSpan.Zero)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
AmqpSessionSettings sessionSettings = new AmqpSessionSettings() { Properties = new Fields() };
session = this.connection.CreateSession(sessionSettings);
await Task.Factory.FromAsync(
(sn, t, k, c, s) => sn.BeginOpen(t, k, c, s),
(r) => ((AmqpSession)r.AsyncState).EndOpen(r),
session,
timeoutHelper.RemainingTime(),
cancellationToken,
session).ConfigureAwait(false);
Fields properties = new Fields();
properties.Add(CbsConstants.TimeoutName, (uint)AmqpConstants.DefaultTimeout.TotalMilliseconds);
link = new RequestResponseAmqpLink("cbs", session, address, properties);
await Task.Factory.FromAsync(
(lk, t, k, c, s) => lk.BeginOpen(t, k, c, s),
(r) => ((RequestResponseAmqpLink)r.AsyncState).EndOpen(r),
link,
timeoutHelper.RemainingTime(),
cancellationToken,
link).ConfigureAwait(false);
AmqpTrace.Provider.AmqpOpenEntitySucceeded(this, link.Name, address);
return link;
}
catch (Exception exception) when (!cancellationToken.IsCancellationRequested)
{
if (this.connection.IsClosing())
{
throw new OperationCanceledException("Connection is closing or closed.", exception);
}
lastException = ExceptionDispatchInfo.Capture(exception);
AmqpTrace.Provider.AmqpOpenEntityFailed(this, this.GetType().Name, address, exception);
}
await Task.Delay(1000, cancellationToken).ConfigureAwait(false);
}
link?.Abort();
session?.SafeClose();
cancellationToken.ThrowIfCancellationRequested();
lastException?.Throw();
return null;
}