public async static Task Executor()

in Backend/src/Trackable.Func/PipelineFunction.cs [73:134]


        public async static Task Executor(
            [QueueTrigger("%TripDetectionQueue%")]string incomingMessage,
            [Blob("%TripDetectionContainerName%/{queueTrigger}")]CloudBlockBlob payloadBlob,
            [Queue("%TripDetectionQueue%")]ICollector<string> outgoingMessage,
            TraceWriter log)
        {
            using (var trackableContext = new TrackableContext(log))
            {
                await trackableContext.ExecuteAsync(async () =>
                {
                    var logger = trackableContext.LoggerFactory.CreateLogger("PipelineExecuter");
                    logger.LogInformation("Running module executor for incoming message {0}", incomingMessage);

                    // Retreive input blob contents
                    AzurePipelineState pipelineState;
                    var formatter = new BinaryFormatter();
                    formatter.Binder = new AdvancedSerializationBinder();
                    using (var inputMemoryStream = new MemoryStream())
                    {
                        await payloadBlob.DownloadToStreamAsync(inputMemoryStream);

                        inputMemoryStream.Seek(0, SeekOrigin.Begin);

                        pipelineState = (AzurePipelineState)formatter.Deserialize(inputMemoryStream);
                    }

                    logger.LogDebugSerialize("State rehydrated {0}", pipelineState);

                    // Get trip detector
                    var tripDetector = await trackableContext.TripDetectorFactory.Create((TripDetectorType)pipelineState.TripDetectorType);
                    var pipeline = new Pipeline(trackableContext.LoggerFactory);

                    // Execute pipeline
                    var output = await pipeline.ExecuteModule(tripDetector.GetModuleLoaders().ElementAt(pipelineState.ModuleIndex), pipelineState.Payload);

                    logger.LogDebugSerialize("Executed module with index {0} and recieved output {1}", pipelineState.ModuleIndex, output);

                    pipelineState.ModuleIndex++;
                    pipelineState.Payload = output;

                    if (pipelineState.ModuleIndex < tripDetector.GetModuleLoaders().Count())
                    {
                        using (var outStream = new MemoryStream())
                        {
                            formatter.Serialize(outStream, pipelineState);
                            outStream.Seek(0, SeekOrigin.Begin);
                            await payloadBlob.UploadFromStreamAsync(outStream);
                        }

                        logger.LogInformation("Wrote blob output for message {0}", incomingMessage);

                        outgoingMessage.Add(incomingMessage);

                        logger.LogInformation("Wrote queue output for message {0}", incomingMessage);
                    }
                    else
                    {
                        logger.LogInformation("Finished processing {0}", incomingMessage);
                    }
                });
            }
        }