in SmvCloudWorker/WorkerRole.cs [66:234]
public override void Run()
{
while (acceptingMessages)
{
try
{
currentMessage = inputQueue.GetMessage(TimeSpan.FromHours(1));
if (currentMessage != null)
{
string schedulerInstanceGuid = String.Empty;
string actionGuid = String.Empty;
int maxDequeueCount = 10;
Boolean useDb = false;
string taskId = String.Empty;
try
{
CloudMessage message = JsonConvert.DeserializeObject<CloudMessage>(currentMessage.AsString);
schedulerInstanceGuid = message.schedulerInstanceGuid;
actionGuid = message.actionGuid;
maxDequeueCount = message.maxDequeueCount;
useDb = message.useDb;
taskId = message.taskId;
} catch(Exception e)
{
Log.LogInfo("Json message parsing failed. Trying old message parsing.");
// Parse the message.
string[] msgParts = currentMessage.AsString.Split(',');
schedulerInstanceGuid = msgParts[0];
actionGuid = msgParts[1];
maxDequeueCount = 10;
try
{
if (msgParts.Count() > 2)
{
maxDequeueCount = Convert.ToInt32(msgParts[2]);
}
}
catch (Exception) {
Log.LogError("Message parsing failed.");
}
}
// Get the table entry.
ActionsTableEntry tableEntry = tableDataSource.GetEntry(schedulerInstanceGuid, actionGuid);
tableDataSource.UpdateStatus(schedulerInstanceGuid, actionGuid, ActionStatus.InProgress);
CloudBlockBlob jobBlob = jobsContainer.GetBlockBlobReference(actionGuid + ".zip");
using (var outputMsg = new BrokeredMessage())
{
outputMsg.Properties["SchedulerInstanceGuid"] = schedulerInstanceGuid;
outputMsg.Properties["ActionGuid"] = actionGuid;
outputMsg.Properties["DequeueCount"] = currentMessage.DequeueCount;
outputMsg.Properties["WaitTime"] = DateTime.Now - currentMessage.InsertionTime;
// Check if we have tried to process this message too many times.
// If so, delete it and report an error back to the client.
if (currentMessage.DequeueCount >= maxDequeueCount)
{
tableDataSource.UpdateStatus(schedulerInstanceGuid, actionGuid, ActionStatus.Error);
SendMessageToTopic(outputMsg);
inputQueue.DeleteMessage(currentMessage);
jobBlob.Delete();
continue;
}
// Switch the version of SMV if required.
if (!SetSmvVersion(tableEntry.Version))
{
Trace.TraceError("Could not set SMV version.");
tableDataSource.UpdateStatus(schedulerInstanceGuid, actionGuid, ActionStatus.Error);
SendMessageToTopic(outputMsg);
inputQueue.DeleteMessage(currentMessage);
jobBlob.Delete();
continue;
}
// Download the job and extract it to the working directory.
Utility.ClearDirectory(workingDirectory);
string jobZipPath = Path.Combine(workingDirectory, "job.zip");
jobBlob.DownloadToFile(jobZipPath, FileMode.CreateNew);
ZipFile.ExtractToDirectory(jobZipPath, workingDirectory);
File.Delete(jobZipPath);
// Deserialize the action.
SMVAction action = (SMVAction)Utility.ByteArrayToObject(tableEntry.SerializedAction);
// Get ready to execute the action.
// We substitute the value of assemblyDir and workingDir with the values on this machine.
string oldWorkingDir = action.variables["workingDir"].ToLower();
string oldAssemblyDir = action.variables["assemblyDir"].ToLower();
string newAssemblyDir = Path.Combine(smvDirectory, "bin").ToLower();
workingDirectory = workingDirectory.ToLower();
var keys = new List<string>(action.variables.Keys);
foreach (var key in keys)
{
if (!string.IsNullOrEmpty(action.variables[key]))
{
if (action.variables[key].ToLower().StartsWith(oldAssemblyDir))
{
action.variables[key] = action.variables[key].ToLower().Replace(oldAssemblyDir, newAssemblyDir);
}
else if (action.variables[key].ToLower().StartsWith(oldWorkingDir))
{
action.variables[key] = action.variables[key].ToLower().Replace(oldWorkingDir, workingDirectory);
}
}
}
// NOTE: We set the Path attribute in the action to null because the action is always processed in the working directory.
var path = action.Path;
action.Path = null;
Utility.SetSmvVar("workingDir", workingDirectory);
// Execute the action.
SMVActionResult result = Utility.ExecuteAction(action, true, useDb, taskId);
// Change the paths back to their old values.
foreach (var key in keys)
{
if (!string.IsNullOrEmpty(action.variables[key]))
{
if (action.variables[key].ToLower().StartsWith(newAssemblyDir))
{
action.variables[key] = action.variables[key].ToLower().Replace(newAssemblyDir, oldAssemblyDir);
}
else if (action.variables[key].ToLower().StartsWith(workingDirectory))
{
action.variables[key] = action.variables[key].ToLower().Replace(workingDirectory, oldWorkingDir);
}
}
}
// Now set the path attribute again because the client needs it.
action.Path = path;
action.result.output = action.result.output.Substring(1, 900) + "... (truncated)";
// Zip up the working directory and upload it as the result.
string resultsZipPath = Path.Combine(resultsDirectory, actionGuid + ".zip");
ZipFile.CreateFromDirectory(workingDirectory, resultsZipPath);
CloudBlockBlob resultsBlob = resultsContainer.GetBlockBlobReference(actionGuid + ".zip");
resultsBlob.UploadFromFile(resultsZipPath);
File.Delete(resultsZipPath);
// Job done!
tableDataSource.UpdateAction(schedulerInstanceGuid, actionGuid, Utility.ObjectToByteArray(action));
tableDataSource.UpdateStatus(schedulerInstanceGuid, actionGuid, ActionStatus.Complete);
SendMessageToTopic(outputMsg);
if (currentMessage != null)
{
inputQueue.DeleteMessage(currentMessage);
currentMessage = null;
}
jobBlob.DeleteIfExists();
Utility.ClearDirectory(workingDirectory);
}
}
}
catch (Exception e)
{
Trace.TraceError("Exception while processing queue item:" + e.ToString());
if (currentMessage != null)
{
inputQueue.UpdateMessage(currentMessage, TimeSpan.FromSeconds(5), MessageUpdateFields.Visibility);
}
}
}
}