in ProcessManager/CacheManager/CacheConnectorUpsert.cs [263:303]
private static async Task<bool> PublishServiceBusQueueEvent(APITask task, string taskBody, AppInsightsLogger appInsightsLogger)
{
string serviceBusConnectionString = Environment.GetEnvironmentVariable(SERVICE_BUS_CONNECTION_STRING_VARIABLE_NAME, EnvironmentVariableTarget.Process);
// A queue must be created for each endpoint, ex:
// The queue name for http://52.224.89.22/v1/paws/consolidate should be http52.224.89.22v1pawsconsolidate
var queueName = task.Endpoint.Replace(".", string.Empty);
queueName = queueName.Replace("/", string.Empty);
queueName = queueName.Replace(":", string.Empty);
var messageProperties = new Dictionary<string,object>
{
{ "TaskId", task.TaskId },
{ "Uri", task.Endpoint }
};
IQueueClient queueClient = new QueueClient(serviceBusConnectionString, queueName);
try
{
Message message = new Message(Encoding.UTF8.GetBytes(taskBody));
message.UserProperties["TaskId"] = task.TaskId;
message.UserProperties["Uri"] = task.Endpoint;
// Write the body of the message to the console.
Console.WriteLine($"Sending taskId {task.TaskId} to queue {queueName}");
// Send the message to the queue.
await queueClient.SendAsync(message);
await queueClient.CloseAsync();
appInsightsLogger.LogInformation($"Sent task {task.TaskId} to queue {queueName}.", task.Endpoint, task.TaskId);
}
catch (Exception ex)
{
appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
return false;
}
return true;
}