public static async Task TaskRun()

in ProcessManager/CacheManager/CacheConnectorUpsert.cs [41:213]


        public static async Task<IActionResult> TaskRun([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req, ILogger logger)
        {
            IDatabase db = null;
            AppInsightsLogger appInsightsLogger = new AppInsightsLogger(logger, LOGGING_SERVICE_NAME, LOGGING_SERVICE_VERSION);

            var redisOperation = "insert";

            APITask task = null;

            if (req.Body != null)
            {
                string body = string.Empty;
                try
                {
                    using (StreamReader reader = new StreamReader(req.Body))
                    {
                        if (reader.BaseStream.Length > 0)
                        {
                            body = reader.ReadToEnd();

                            if (body.StartsWith("["))
                            {
                                task = JsonConvert.DeserializeObject<APITask[]>(body)[0];
                            }
                            else
                            {
                                task = JsonConvert.DeserializeObject<APITask>(body);
                            }
                        }
                        else
                        {
                            appInsightsLogger.LogWarning("Parameters missing. Unable to create task.");
                            return new BadRequestResult();
                        }
                    }
                }
                catch (Exception ex)
                {
                    appInsightsLogger.LogInformation(ex.Message + ex.StackTrace.ToString());
                    appInsightsLogger.LogError(ex);
                    appInsightsLogger.LogRedisUpsert("Redis upsert failed.", redisOperation, task.Timestamp, body);
                    return new StatusCodeResult(500);
                }
            }
            else
            {
                appInsightsLogger.LogInformation("Parameters missing. Unable to create task.");
                appInsightsLogger.LogWarning("Parameters missing. Unable to create task.");
                return new BadRequestResult();
            }

            if (!String.IsNullOrWhiteSpace(task.TaskId))
            {
                appInsightsLogger.LogInformation("Updating status", task.Endpoint, task.TaskId);
                redisOperation = "update";
            }
            else
            {
                task.TaskId = Guid.NewGuid().ToString();
            }

            task.Timestamp = DateTime.UtcNow.ToString();

            try
            {
                db = RedisConnection.GetDatabase();
            }
            catch (Exception ex)
            {
                appInsightsLogger.LogInformation(ex.Message + ex.StackTrace.ToString());
                appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
                appInsightsLogger.LogRedisUpsert("Redis upsert failed.", redisOperation, task.Timestamp, task.Endpoint, task.TaskId);
                return new StatusCodeResult(500);
            }

            string serializedTask = string.Empty;
            try
            {
                var taskBody = task.Body;
                task.Body = null;

                serializedTask = JsonConvert.SerializeObject(task);
                RedisValue res = RedisValue.Null;

                var upsertTransaction = db.CreateTransaction();

                upsertTransaction.StringSetAsync(task.TaskId, serializedTask);

                // Get seconds since epoch
                TimeSpan ts = (DateTime.UtcNow - new DateTime(1970, 1, 1));
                int timestamp = (int)ts.TotalSeconds;

                upsertTransaction.SortedSetAddAsync(string.Format("{0}_{1}", task.EndpointPath, task.BackendStatus), new SortedSetEntry[] { new SortedSetEntry(task.TaskId, timestamp) });

                if (task.BackendStatus.Equals(BACKEND_STATUS_RUNNING))
                {
                    upsertTransaction.SortedSetRemoveAsync(string.Format("{0}_{1}", task.EndpointPath, BACKEND_STATUS_CREATED), task.TaskId);
                }
                else if (task.BackendStatus.Equals(BACKEND_STATUS_COMPLETED) || task.BackendStatus.Equals(BACKEND_STATUS_FAILED))
                {
                    upsertTransaction.SortedSetRemoveAsync(string.Format("{0}_{1}", task.EndpointPath, BACKEND_STATUS_RUNNING), task.TaskId);
                }

                bool isSubsequentPipelineCall = false;

                bool isPublish = false;
                bool.TryParse(task.PublishToGrid.ToString(), out isPublish);

                if (isPublish == true || task.PublishToGrid == true)
                {
                    if (string.IsNullOrEmpty(taskBody))
                    {
                        // This is a subsequent pipeline publish.
                        isSubsequentPipelineCall = true;
                    }
                    else
                    {
                        upsertTransaction.StringSetAsync(string.Format("{0}_{1}", task.TaskId, GRID_PUBLISH_RECORD_KEY), taskBody);
                    }
                }

                var watch = Stopwatch.StartNew();
                if (await upsertTransaction.ExecuteAsync() == false)
                {
                    var ex = new Exception("Unable to complete redis transaction.");
                    appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
                    throw ex;
                }
                watch.Stop();
                appInsightsLogger.LogInformation(string.Format("ExecuteAsync duration: {0}", watch.ElapsedMilliseconds), task.Endpoint, task.TaskId);

                if (isSubsequentPipelineCall)
                {
                    // We have to get the original body, since it's currently empty.
                    taskBody = await db.StringGetAsync(string.Format("{0}_{1}", task.TaskId, GRID_PUBLISH_RECORD_KEY));
                }

                if (task.PublishToGrid)
                {
                    watch.Restart();
                    if (await PublishEvent(task, taskBody, appInsightsLogger) == false)
                    {
                        // Move task to failed
                        var updateTransaction = db.CreateTransaction();
                        task.Status = "Failed - unable to send to backend service.";
                        task.BackendStatus = BACKEND_STATUS_FAILED;
                        string updateBody = JsonConvert.SerializeObject(task);

                        updateTransaction.StringSetAsync(task.TaskId, updateBody);
                        updateTransaction.SortedSetAddAsync(string.Format("{0}_{1}", task.EndpointPath, task.BackendStatus), new SortedSetEntry[] { new SortedSetEntry(task.TaskId, timestamp) });
                        updateTransaction.SortedSetRemoveAsync(string.Format("{0}_{1}", task.EndpointPath, BACKEND_STATUS_CREATED), task.TaskId);

                        if (await updateTransaction.ExecuteAsync() == false)
                        {
                            var ex = new Exception("Unable to complete redis transaction.");
                            appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
                            throw ex;
                        }
                    }
                    watch.Stop();
                    appInsightsLogger.LogInformation(string.Format("PublishEvent duration: {0}", watch.ElapsedMilliseconds), task.Endpoint, task.TaskId);
                }
            }
            catch (Exception ex)
            {
                appInsightsLogger.LogInformation(ex.Message + ex.StackTrace.ToString());
                appInsightsLogger.LogError(ex, task.Endpoint, task.TaskId);
                appInsightsLogger.LogRedisUpsert("Redis upsert failed.", redisOperation, task.Timestamp, serializedTask, task.Endpoint, task.TaskId);
                return new StatusCodeResult(500);
            }

            return new OkObjectResult(serializedTask);
        }