in SmvLibrary/CloudSMVActionScheduler.cs [229:301]
private void ActionComplete(BrokeredMessage message)
{
var actionGuid = (string)message.Properties["ActionGuid"];
var waitTime = (TimeSpan)message.Properties["WaitTime"]; // The amount of time the rule had to wait before it started being processed.
var dequeueCount = (int)message.Properties["DequeueCount"]; // The number of times the message we sent was dequeued by the workers.
message.Complete();
CloudActionCompleteContext context = contextDictionary[actionGuid];
ActionsTableEntry entry = tableDataSource.GetEntry(schedulerInstanceGuid, actionGuid);
var action = (SMVAction)Utility.ByteArrayToObject(entry.SerializedAction);
Log.LogDebug("Reached ActionComplete of Cloud " + action.GetFullName());
if(action.result == null)
{
action.result = new SMVActionResult(action.name, "NO OUTPUT?", false, false, 0);
Log.LogError(string.Format("Failed to complete action: {0} ({1})", actionGuid, context.action.name));
}
Log.LogDebug("ActionComplete for " + action.GetFullName() + " [cloud id " + actionGuid + "]");
// Populate the original action object so that the master scheduler gets the changes to the action object.
context.action.analysisProperty = action.analysisProperty;
context.action.result = action.result;
context.action.variables = action.variables;
var results = new SMVActionResult[] { context.action.result };
if (entry.Status != (int)ActionStatus.Complete)
{
Log.LogError(string.Format("Failed to complete action: {0} ({1})", actionGuid, context.action.name));
context.callback(context.action, new SMVActionResult[] { context.action.result }, context.context);
Utility.scheduler.errorsEncountered = true;
return;
}
// Download and extract the results.
string actionDirectory = Utility.GetActionDirectory(context.action);
CloudBlockBlob resultsBlob = outputContainer.GetBlockBlobReference(actionGuid + ".zip");
string zipPath = Path.Combine(Path.GetTempPath(), actionGuid);
if (File.Exists(zipPath)) File.Delete(zipPath);
if (resultsBlob.Exists())
{
resultsBlob.DownloadToFile(zipPath, FileMode.CreateNew);
resultsBlob.Delete();
using (var archive = ZipFile.OpenRead(zipPath))
{
foreach (var f in archive.Entries)
{
var toBeExtractedFilePath = Path.Combine(actionDirectory, f.FullName);
if (File.Exists(toBeExtractedFilePath))
{
File.Delete(toBeExtractedFilePath);
}
}
archive.ExtractToDirectory(actionDirectory);
}
File.Delete(zipPath);
// Write to the cloudstats.txt file.
var contents = new string[] { "Wait Time: " + waitTime.ToString() ,
"Dequeue Count: " + dequeueCount ,
"Output" + Environment.NewLine + results.First().output };
File.AppendAllLines(Path.Combine(actionDirectory, "cloudstats.txt"), contents);
Log.LogDebug("download results for " + action.GetFullName() + " [cloud id " + actionGuid + "]");
}
else
{
Log.LogInfo("Results for " + action.GetFullName() + " [cloud id " + actionGuid + "] not available!");
}
context.callback(context.action, results, context.context);
}