in ProcessManager/CacheManager/CacheConnectorUpsert.cs [41:213]
public static async Task<IActionResult> TaskRun([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req, ILogger logger)
{
IDatabase db = null;
AppInsightsLogger appInsightsLogger = new AppInsightsLogger(logger, LOGGING_SERVICE_NAME, LOGGING_SERVICE_VERSION);
var redisOperation = "insert";
APITask task = null;
if (req.Body != null)
{
string body = string.Empty;
try
{
using (StreamReader reader = new StreamReader(req.Body))
{
if (reader.BaseStream.Length > 0)
{
body = reader.ReadToEnd();
if (body.StartsWith("["))
{
task = JsonConvert.DeserializeObject<APITask[]>(body)[0];
}
else
{
task = JsonConvert.DeserializeObject<APITask>(body);
}
}
else
{
appInsightsLogger.LogWarning("Parameters missing. Unable to create task.");
return new BadRequestResult();
}
}
}
catch (Exception ex)
{
appInsightsLogger.LogInformation(ex.Message + ex.StackTrace.ToString());
appInsightsLogger.LogError(ex);
appInsightsLogger.LogRedisUpsert("Redis upsert failed.", redisOperation, task.Timestamp, body);
return new StatusCodeResult(500);
}
}
else
{
appInsightsLogger.LogInformation("Parameters missing. Unable to create task.");
appInsightsLogger.LogWarning("Parameters missing. Unable to create task.");
return new BadRequestResult();
}
if (!String.IsNullOrWhiteSpace(task.TaskId))
{
appInsightsLogger.LogInformation("Updating status", task.Endpoint, task.TaskId);
redisOperation = "update";
}
else
{
task.TaskId = Guid.NewGuid().ToString();
}
task.Timestamp = DateTime.UtcNow.ToString();
try
{
db = RedisConnection.GetDatabase();
}
catch (Exception ex)
{
appInsightsLogger.LogInformation(ex.Message + ex.StackTrace.ToString());
appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
appInsightsLogger.LogRedisUpsert("Redis upsert failed.", redisOperation, task.Timestamp, task.Endpoint, task.TaskId);
return new StatusCodeResult(500);
}
string serializedTask = string.Empty;
try
{
var taskBody = task.Body;
task.Body = null;
serializedTask = JsonConvert.SerializeObject(task);
RedisValue res = RedisValue.Null;
var upsertTransaction = db.CreateTransaction();
upsertTransaction.StringSetAsync(task.TaskId, serializedTask);
// Get seconds since epoch
TimeSpan ts = (DateTime.UtcNow - new DateTime(1970, 1, 1));
int timestamp = (int)ts.TotalSeconds;
upsertTransaction.SortedSetAddAsync(string.Format("{0}_{1}", task.EndpointPath, task.BackendStatus), new SortedSetEntry[] { new SortedSetEntry(task.TaskId, timestamp) });
if (task.BackendStatus.Equals(BACKEND_STATUS_RUNNING))
{
upsertTransaction.SortedSetRemoveAsync(string.Format("{0}_{1}", task.EndpointPath, BACKEND_STATUS_CREATED), task.TaskId);
}
else if (task.BackendStatus.Equals(BACKEND_STATUS_COMPLETED) || task.BackendStatus.Equals(BACKEND_STATUS_FAILED))
{
upsertTransaction.SortedSetRemoveAsync(string.Format("{0}_{1}", task.EndpointPath, BACKEND_STATUS_RUNNING), task.TaskId);
}
bool isSubsequentPipelineCall = false;
bool isPublish = false;
bool.TryParse(task.PublishToGrid.ToString(), out isPublish);
if (isPublish == true || task.PublishToGrid == true)
{
if (string.IsNullOrEmpty(taskBody))
{
// This is a subsequent pipeline publish.
isSubsequentPipelineCall = true;
}
else
{
upsertTransaction.StringSetAsync(string.Format("{0}_{1}", task.TaskId, GRID_PUBLISH_RECORD_KEY), taskBody);
}
}
var watch = Stopwatch.StartNew();
if (await upsertTransaction.ExecuteAsync() == false)
{
var ex = new Exception("Unable to complete redis transaction.");
appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
throw ex;
}
watch.Stop();
appInsightsLogger.LogInformation(string.Format("ExecuteAsync duration: {0}", watch.ElapsedMilliseconds), task.Endpoint, task.TaskId);
if (isSubsequentPipelineCall)
{
// We have to get the original body, since it's currently empty.
taskBody = await db.StringGetAsync(string.Format("{0}_{1}", task.TaskId, GRID_PUBLISH_RECORD_KEY));
}
if (task.PublishToGrid)
{
watch.Restart();
if (await PublishEvent(task, taskBody, appInsightsLogger) == false)
{
// Move task to failed
var updateTransaction = db.CreateTransaction();
task.Status = "Failed - unable to send to backend service.";
task.BackendStatus = BACKEND_STATUS_FAILED;
string updateBody = JsonConvert.SerializeObject(task);
updateTransaction.StringSetAsync(task.TaskId, updateBody);
updateTransaction.SortedSetAddAsync(string.Format("{0}_{1}", task.EndpointPath, task.BackendStatus), new SortedSetEntry[] { new SortedSetEntry(task.TaskId, timestamp) });
updateTransaction.SortedSetRemoveAsync(string.Format("{0}_{1}", task.EndpointPath, BACKEND_STATUS_CREATED), task.TaskId);
if (await updateTransaction.ExecuteAsync() == false)
{
var ex = new Exception("Unable to complete redis transaction.");
appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
throw ex;
}
}
watch.Stop();
appInsightsLogger.LogInformation(string.Format("PublishEvent duration: {0}", watch.ElapsedMilliseconds), task.Endpoint, task.TaskId);
}
}
catch (Exception ex)
{
appInsightsLogger.LogInformation(ex.Message + ex.StackTrace.ToString());
appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
appInsightsLogger.LogRedisUpsert("Redis upsert failed.", redisOperation, task.Timestamp, serializedTask, task.Endpoint, task.TaskId);
return new StatusCodeResult(500);
}
return new OkObjectResult(serializedTask);
}