in src/ActiveMQ/Providers/ActiveMQTriggerServiceOperationProvider.cs [325:388]
public Task<ServiceOperationResponse> InvokeOperation(string operationId, InsensitiveDictionary<JToken> connectionParameters,
ServiceOperationRequest serviceOperationRequest)
{
try
{
ServiceOpertionsProviderValidation.OperationId(operationId);
ActiveMQTriggerParameters activeMQTriggerParameters = new ActiveMQTriggerParameters(connectionParameters, serviceOperationRequest);
var connectionFactory = new NmsConnectionFactory(activeMQTriggerParameters.UserName, activeMQTriggerParameters.Password, activeMQTriggerParameters.BrokerUri);
using (var connection = connectionFactory.CreateConnection())
{
connection.ClientId = activeMQTriggerParameters.ClientId;
using (var session = connection.CreateSession(AcknowledgementMode.Transactional))
{
using (var queue = session.GetQueue(activeMQTriggerParameters.QueueName))
{
using (var consumer = session.CreateConsumer(queue))
{
connection.Start();
List<JObject> receiveMessages = new List<JObject>();
for (int i = 0; i < activeMQTriggerParameters.MaximumNumber; i++)
{
var message = consumer.Receive(messageReceiveTimeout) as ITextMessage;
if (message != null)
{
receiveMessages.Add(new JObject
{
{ "contentData", message.Text },
{ "Properties", new JObject{ { "NMSMessageId", message.NMSMessageId } } },
});
}
else
{
//Will exit the loop if there are no message
break;
}
}
session.Commit();
session.Close();
connection.Close();
if (receiveMessages.Count == 0)
{
return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JObject.FromObject(new { message = "No messages" }), System.Net.HttpStatusCode.Accepted));
}
else
{
return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JArray.FromObject(receiveMessages), System.Net.HttpStatusCode.OK));
}
}
}
}
}
}
catch (Exception e)
{
var error = e.Message;
return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JObject.FromObject(new { message = error }), System.Net.HttpStatusCode.InternalServerError));
}
}