in ProcessManager/BackendQueueProcessor/BackendQueueProcessor.cs [28:81]
public static async Task ServiceBusQueueProcessorAsync(
[ServiceBusTrigger("%SERVICE_BUS_QUEUE%")] Message message, MessageReceiver messageReceiver, ILogger logger)
{
var timestamp = DateTime.UtcNow;
var queueName = Environment.GetEnvironmentVariable("SERVICE_BUS_QUEUE", EnvironmentVariableTarget.Process);
logger.LogTrace($@"[{message.UserProperties[@"TaskId"]}]: Message received at {timestamp}: {JObject.FromObject(message)}");
AppInsightsLogger appInsightsLogger = new AppInsightsLogger(logger, LOGGING_SERVICE_NAME + ": " + queueName, LOGGING_SERVICE_VERSION);
var enqueuedTime = message.ScheduledEnqueueTimeUtc;
var elapsedTimeMs = (timestamp - enqueuedTime).TotalMilliseconds;
var taskId = message.UserProperties["TaskId"].ToString();
var backendUri = message.UserProperties["Uri"].ToString();
var messageBody = Encoding.UTF8.GetString(message.Body);
try
{
appInsightsLogger.LogInformation($"Sending request to {backendUri} for taskId {taskId} from queue {queueName}. Queued for {elapsedTimeMs/60} seconds.", backendUri, taskId);
var client = new HttpClient();
client.DefaultRequestHeaders.Add("taskId", taskId);
var httpContent = new StringContent(messageBody, Encoding.UTF8, "application/json");
var res = await client.PostAsync(backendUri, httpContent);
if (res.StatusCode == (System.Net.HttpStatusCode)429) // Special return value indicating that the service is busy.
{
var retryDelay = int.Parse(Environment.GetEnvironmentVariable(QUEUE_RETRY_DELAY_MS_VARIABLE_NAME, EnvironmentVariableTarget.Process));
appInsightsLogger.LogInformation($"Service is busy. Will try again in {retryDelay/1000} seconds.", backendUri, taskId);
await UpdateTaskStatus(taskId, backendUri, messageBody, $"Awaiting service availability. Queued for {elapsedTimeMs/60} seconds.", "created", appInsightsLogger);
// Artifical delay is needed since the ServiceBusTrigger will retry immediately.
await Task.Delay(retryDelay);
await messageReceiver.AbandonAsync(message.SystemProperties.LockToken);
throw new ApplicationException($"Service is busy. Will try again in {retryDelay/1000} seconds.");
}
else if (!res.IsSuccessStatusCode)
{
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken); //Need to complete even though we have failure. This removes it from the queue to avoid an infinite state.
appInsightsLogger.LogWarning($"Unable to send request to backend. Status: {res.StatusCode.ToString()}, Reason: {res.ReasonPhrase}", backendUri, taskId);
await UpdateTaskStatus(taskId, backendUri, messageBody, $"Unable to send request to backend.", "failed", appInsightsLogger);
}
else
{
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
appInsightsLogger.LogInformation($"taskId {taskId} has successfully been pushed to the backend from queue {queueName}. Queue time: {elapsedTimeMs/60} seconds.", backendUri, taskId);
}
}
catch (Exception ex)
{
appInsightsLogger.LogError(ex, backendUri, taskId);
}
}