in Backend/src/Trackable.Func/PipelineFunction.cs [73:134]
public async static Task Executor(
[QueueTrigger("%TripDetectionQueue%")]string incomingMessage,
[Blob("%TripDetectionContainerName%/{queueTrigger}")]CloudBlockBlob payloadBlob,
[Queue("%TripDetectionQueue%")]ICollector<string> outgoingMessage,
TraceWriter log)
{
using (var trackableContext = new TrackableContext(log))
{
await trackableContext.ExecuteAsync(async () =>
{
var logger = trackableContext.LoggerFactory.CreateLogger("PipelineExecuter");
logger.LogInformation("Running module executor for incoming message {0}", incomingMessage);
// Retreive input blob contents
AzurePipelineState pipelineState;
var formatter = new BinaryFormatter();
formatter.Binder = new AdvancedSerializationBinder();
using (var inputMemoryStream = new MemoryStream())
{
await payloadBlob.DownloadToStreamAsync(inputMemoryStream);
inputMemoryStream.Seek(0, SeekOrigin.Begin);
pipelineState = (AzurePipelineState)formatter.Deserialize(inputMemoryStream);
}
logger.LogDebugSerialize("State rehydrated {0}", pipelineState);
// Get trip detector
var tripDetector = await trackableContext.TripDetectorFactory.Create((TripDetectorType)pipelineState.TripDetectorType);
var pipeline = new Pipeline(trackableContext.LoggerFactory);
// Execute pipeline
var output = await pipeline.ExecuteModule(tripDetector.GetModuleLoaders().ElementAt(pipelineState.ModuleIndex), pipelineState.Payload);
logger.LogDebugSerialize("Executed module with index {0} and recieved output {1}", pipelineState.ModuleIndex, output);
pipelineState.ModuleIndex++;
pipelineState.Payload = output;
if (pipelineState.ModuleIndex < tripDetector.GetModuleLoaders().Count())
{
using (var outStream = new MemoryStream())
{
formatter.Serialize(outStream, pipelineState);
outStream.Seek(0, SeekOrigin.Begin);
await payloadBlob.UploadFromStreamAsync(outStream);
}
logger.LogInformation("Wrote blob output for message {0}", incomingMessage);
outgoingMessage.Add(incomingMessage);
logger.LogInformation("Wrote queue output for message {0}", incomingMessage);
}
else
{
logger.LogInformation("Finished processing {0}", incomingMessage);
}
});
}
}