async Task CreateCbsLinkAsync()

in src/Cbs/AmqpCbsLink.cs [182:241]


        async Task<RequestResponseAmqpLink> CreateCbsLinkAsync(TimeSpan timeout, CancellationToken cancellationToken)
        {
            string address = CbsConstants.CbsAddress;
            AmqpSession session = null;
            RequestResponseAmqpLink link = null;
            ExceptionDispatchInfo lastException = null;
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);

            while (timeoutHelper.RemainingTime() > TimeSpan.Zero)
            {
                cancellationToken.ThrowIfCancellationRequested();

                try
                {
                    AmqpSessionSettings sessionSettings = new AmqpSessionSettings() { Properties = new Fields() };
                    session = this.connection.CreateSession(sessionSettings);
                    await Task.Factory.FromAsync(
                        (sn, t, k, c, s) => sn.BeginOpen(t, k, c, s),
                        (r) => ((AmqpSession)r.AsyncState).EndOpen(r),
                        session,
                        timeoutHelper.RemainingTime(),
                        cancellationToken,
                        session).ConfigureAwait(false);

                    Fields properties = new Fields();
                    properties.Add(CbsConstants.TimeoutName, (uint)AmqpConstants.DefaultTimeout.TotalMilliseconds);
                    link = new RequestResponseAmqpLink("cbs", session, address, properties);
                    await Task.Factory.FromAsync(
                        (lk, t, k, c, s) => lk.BeginOpen(t, k, c, s),
                        (r) => ((RequestResponseAmqpLink)r.AsyncState).EndOpen(r),
                        link,
                        timeoutHelper.RemainingTime(),
                        cancellationToken,
                        link).ConfigureAwait(false);

                    AmqpTrace.Provider.AmqpOpenEntitySucceeded(this, link.Name, address);
                    return link;
                }
                catch (Exception exception) when (!cancellationToken.IsCancellationRequested)
                {
                    if (this.connection.IsClosing())
                    {
                        throw new OperationCanceledException("Connection is closing or closed.", exception);
                    }

                    lastException = ExceptionDispatchInfo.Capture(exception);
                    AmqpTrace.Provider.AmqpOpenEntityFailed(this, this.GetType().Name, address, exception);
                }

                await Task.Delay(1000, cancellationToken).ConfigureAwait(false);
            }

            link?.Abort();
            session?.SafeClose();

            cancellationToken.ThrowIfCancellationRequested();

            lastException?.Throw();
            return null;
        }