public Task InvokeOperation()

in src/ActiveMQ/Providers/ActiveMQTriggerServiceOperationProvider.cs [325:388]


        public Task<ServiceOperationResponse> InvokeOperation(string operationId, InsensitiveDictionary<JToken> connectionParameters,
            ServiceOperationRequest serviceOperationRequest)
        {
            try
            {
                ServiceOpertionsProviderValidation.OperationId(operationId);

                ActiveMQTriggerParameters activeMQTriggerParameters = new ActiveMQTriggerParameters(connectionParameters, serviceOperationRequest);

                var connectionFactory = new NmsConnectionFactory(activeMQTriggerParameters.UserName, activeMQTriggerParameters.Password, activeMQTriggerParameters.BrokerUri);
                using (var connection = connectionFactory.CreateConnection())
                {
                    connection.ClientId = activeMQTriggerParameters.ClientId;

                    using (var session = connection.CreateSession(AcknowledgementMode.Transactional))
                    {
                        using (var queue = session.GetQueue(activeMQTriggerParameters.QueueName))
                        {
                            using (var consumer = session.CreateConsumer(queue))
                            {
                                connection.Start();

                                List<JObject> receiveMessages = new List<JObject>();

                                for (int i = 0; i < activeMQTriggerParameters.MaximumNumber; i++)
                                {
                                    var message = consumer.Receive(messageReceiveTimeout) as ITextMessage;
                                    if (message != null)
                                    {
                                        receiveMessages.Add(new JObject
                                    {
                                        { "contentData", message.Text },
                                        { "Properties", new JObject{ { "NMSMessageId", message.NMSMessageId } } },
                                    });
                                    }
                                    else
                                    {
                                        //Will exit the loop if there are no message
                                        break;
                                    }
                                }
                                session.Commit();
                                session.Close();
                                connection.Close();

                                if (receiveMessages.Count == 0)
                                {
                                    return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JObject.FromObject(new { message = "No messages" }), System.Net.HttpStatusCode.Accepted));
                                }
                                else
                                {
                                    return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JArray.FromObject(receiveMessages), System.Net.HttpStatusCode.OK));
                                }
                            }
                        }
                    }
                }
            }
            catch (Exception e)
            {
                var error = e.Message;
                return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JObject.FromObject(new { message = error }), System.Net.HttpStatusCode.InternalServerError));
            }
        }