src/NMS.AMQP/NmsQueueBrowser.cs (134 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System.Collections; using System.Threading.Tasks; using Apache.NMS.AMQP.Meta; using Apache.NMS.AMQP.Util; using Apache.NMS.AMQP.Util.Synchronization; namespace Apache.NMS.AMQP { public class NmsQueueBrowser : IQueueBrowser, IEnumerator { private readonly NmsSynchronizationMonitor syncRoot = new NmsSynchronizationMonitor(); private readonly NmsSession session; private readonly IQueue destination; private readonly string selector; private volatile NmsMessageConsumer consumer; private IMessage current; private readonly AtomicBool closed = new AtomicBool(); public NmsQueueBrowser(NmsSession session, IQueue destination, string selector) { this.session = session; this.destination = destination; this.selector = selector; } public IEnumerator GetEnumerator() { CheckClosed(); CreateConsumer(); return this; } public bool MoveNext() { current = Next(); if (!session.IsStarted) { DestroyConsumer(); return false; } return current != null; } private IMessage Next() { while (true) { IMessageConsumer consumer = this.consumer; if (consumer == null) { return null; } IMessage next = null; try { next = consumer.ReceiveNoWait(); } catch (NMSException e) { Tracer.WarnFormat("Error while receive the next message: {}", e.Message); } if (next == null) { DestroyConsumer(); } return next; } } public void Reset() { CheckClosed(); DestroyConsumer(); CreateConsumer(); } public object Current { get => current; } public void Dispose() { Close(); } public void Close() { CloseAsync().GetAsyncResult(); } public async Task CloseAsync() { if (closed.CompareAndSet(false, true)) { await DestroyConsumerAsync().Await(); } } public string MessageSelector => selector; public IQueue Queue => destination; private void CheckClosed() { if (closed) { throw new IllegalStateException("The MessageConsumer is closed"); } } private void CreateConsumer() { using(syncRoot.Lock()) { if (consumer == null) { NmsMessageConsumer messageConsumer = new NmsQueueBrowserMessageConsumer(session.GetNextConsumerId(), session, destination, selector, false); messageConsumer.Init().GetAsyncResult(); // Assign only after fully created and initialized. consumer = messageConsumer; } } } private void DestroyConsumer() { DestroyConsumerAsync().GetAsyncResult(); } private async Task DestroyConsumerAsync() { using(await syncRoot.LockAsync().Await()) { try { await (consumer != null ? consumer.CloseAsync() : Task.CompletedTask).Await(); } catch (NMSException e) { Tracer.DebugFormat("Error closing down internal consumer: ", e); } finally { consumer = null; } } } public class NmsQueueBrowserMessageConsumer : NmsMessageConsumer { public NmsQueueBrowserMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal) { } protected override bool IsBrowser => true; } } }