in SmvLibrary/CloudSMVActionScheduler.cs [61:166]
public CloudSMVActionScheduler(SMVCloudConfig config)
{
// Set the instance GUID.
schedulerInstanceGuid = Guid.NewGuid().ToString();
Log.LogInfo("Scheduler Instance GUID: " + schedulerInstanceGuid);
Log.LogInfo($"Setting max dequeue count to {maxDequeueCount}");
// Check if the connection strings are set properly.
storageConnectionString = config.StorageConnectionString.value;
serviceBusConnectionString = config.ServiceBusConnectionString.value;
if (string.IsNullOrEmpty(storageConnectionString))
{
throw new Exception("Connection string \"Microsoft.WindowsAzure.Storage.ConnectionString\" is not set.");
}
if (string.IsNullOrEmpty(serviceBusConnectionString))
{
throw new Exception("Connection string \"Microsoft.ServiceBus.ConnectionString\" is not set.");
}
int retriesLeft = CloudConstants.MaxRetries;
while (true)
{
try
{
// Connect to cloud storage.
var storageAccount = CloudStorageAccount.Parse(storageConnectionString);
var queueStorage = storageAccount.CreateCloudQueueClient();
var blobStorage = storageAccount.CreateCloudBlobClient();
var tableStorage = storageAccount.CreateCloudTableClient();
// Set up blob storage.
blobStorage.DefaultRequestOptions = new BlobRequestOptions();
blobStorage.DefaultRequestOptions.RetryPolicy = new ExponentialRetry(TimeSpan.FromSeconds(CloudConstants.RetryBackoffInterval), CloudConstants.MaxRetries);
inputContainer = blobStorage.GetContainerReference(CloudConstants.InputBlobContainerName);
outputContainer = blobStorage.GetContainerReference(CloudConstants.OutputBlobContainerName);
// Set up our queue.
queueStorage.DefaultRequestOptions = new QueueRequestOptions();
queueStorage.DefaultRequestOptions.RetryPolicy = new ExponentialRetry(TimeSpan.FromSeconds(CloudConstants.RetryBackoffInterval), CloudConstants.MaxRetries);
actionsQueue = queueStorage.GetQueueReference(CloudConstants.InputQueueName);
// Set up table storage.
tableStorage.DefaultRequestOptions = new TableRequestOptions();
tableStorage.DefaultRequestOptions.RetryPolicy = new ExponentialRetry(TimeSpan.FromSeconds(CloudConstants.RetryBackoffInterval), CloudConstants.MaxRetries);
CloudTable table = tableStorage.GetTableReference(CloudConstants.TableName);
tableDataSource = new ActionsTableDataSource(table);
// Set up the service bus subscription.
namespaceManager = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
var filter = new SqlFilter(String.Format(CultureInfo.CurrentCulture, "(SchedulerInstanceGuid = '{0}')", schedulerInstanceGuid));
var subDesc = new SubscriptionDescription(CloudConstants.ResultsTopicName, schedulerInstanceGuid);
subDesc.AutoDeleteOnIdle = TimeSpan.FromDays(7.0);
if (!namespaceManager.SubscriptionExists(CloudConstants.ResultsTopicName, schedulerInstanceGuid))
{
namespaceManager.CreateSubscription(subDesc, filter);
}
subscriptionClient = SubscriptionClient.CreateFromConnectionString(serviceBusConnectionString,
CloudConstants.ResultsTopicName, schedulerInstanceGuid);
subscriptionClient.RetryPolicy = new RetryExponential(
TimeSpan.FromSeconds(CloudConstants.RetryBackoffInterval),
TimeSpan.FromSeconds(CloudConstants.RetryBackoffInterval * CloudConstants.MaxRetries),
CloudConstants.MaxRetries);
subscriptionClient.OnMessage((msg) =>
{
try
{
ActionComplete(msg);
}
catch (Exception e)
{
Log.LogError("Exception when completing action for " + msg.MessageId);
Log.LogError(e.ToString());
cloudExceptionCallback(msg);
msg.Abandon();
}
});
// If we're here, we've successfully initialized everything and can break out of the retry loop.
break;
}
catch(MessagingEntityAlreadyExistsException e)
{
Thread.Sleep(60000);
if(e.IsTransient && retriesLeft > 0)
{
retriesLeft--;
}
else
{
throw;
}
}
catch (Exception e)
{
// We can fix the three exceptions below by using retry logic. But there's no point retrying for other exceptions.
if ((e is TimeoutException || e is ServerBusyException || e is MessagingCommunicationException) && retriesLeft > 0)
{
retriesLeft--;
}
else
{
throw;
}
}
}
}