in AsPartitionProcessing/AsPartitionProcessing/PartitionProcessor.cs [55:257]
private static void PerformProcessing()
{
Server server = new Server();
try
{
Database database;
Connect(server, out database);
Console.ForegroundColor = ConsoleColor.White;
LogMessage($"Start: {DateTime.Now.ToString("hh:mm:ss tt")}", MessageType.Informational, false);
LogMessage($"Server: {_modelConfiguration.AnalysisServicesServer}", MessageType.Informational, false);
LogMessage($"Database: {_modelConfiguration.AnalysisServicesDatabase}", MessageType.Informational, false);
Console.ForegroundColor = ConsoleColor.Yellow;
foreach (TableConfiguration tableConfiguration in _modelConfiguration.TableConfigurations)
{
Table table = database.Model.Tables.Find(tableConfiguration.AnalysisServicesTable);
if (table == null)
{
throw new Microsoft.AnalysisServices.ConnectionException($"Could not connect to table {tableConfiguration.AnalysisServicesTable}.");
}
if (tableConfiguration.PartitioningConfigurations.Count == 0)
{
//Non-partitioned table. Process at table level.
LogMessage("", MessageType.Informational, false);
LogMessage($"Non-partitioned processing for table {tableConfiguration.AnalysisServicesTable}", MessageType.Informational, false);
LogMessage(new String('-', tableConfiguration.AnalysisServicesTable.Length + 37), MessageType.Informational, false);
if (_modelConfiguration.IncrementalOnline)
{
LogMessage($"Process table {tableConfiguration.AnalysisServicesTable} /Full", MessageType.Informational, true);
table.RequestRefresh(RefreshType.Full);
}
else
{
LogMessage($"Process table {tableConfiguration.AnalysisServicesTable} /DataOnly", MessageType.Informational, true);
table.RequestRefresh(RefreshType.DataOnly);
}
}
else
{
//Validate multiple granularity ranges.
tableConfiguration.ValidatePartitioningConfigurations();
//Find template partition.
Partition templatePartition = table.Partitions.Find(tableConfiguration.AnalysisServicesTable);
if (templatePartition == null)
{
throw new InvalidOperationException($"Table {tableConfiguration.AnalysisServicesTable} does not contain a partition with the same name to act as the template partition.");
}
//Process based on partitioning configuration(s).
foreach (PartitioningConfiguration partitioningConfiguration in tableConfiguration.PartitioningConfigurations)
{
LogMessage("", MessageType.Informational, false);
LogMessage($"Rolling-window partitioning for table {tableConfiguration.AnalysisServicesTable}", MessageType.Informational, false);
LogMessage(new String('-', tableConfiguration.AnalysisServicesTable.Length + 38), MessageType.Informational, false);
//Figure out what processing needs to be done
List<string> partitionKeysCurrent = GetPartitionKeysCurrent(table, partitioningConfiguration.Granularity);
List<string> partitionKeysNew = GetPartitionKeysTarget(false, partitioningConfiguration, partitioningConfiguration.Granularity);
List<string> partitionKeysForProcessing = GetPartitionKeysTarget(true, partitioningConfiguration, partitioningConfiguration.Granularity);
DisplayPartitionRange(partitionKeysCurrent, true, partitioningConfiguration.Granularity);
DisplayPartitionRange(partitionKeysNew, false, partitioningConfiguration.Granularity);
LogMessage("", MessageType.Informational, false);
LogMessage("=>Actions & progress:", MessageType.Informational, false);
//Check for old partitions that need to be removed
foreach (string partitionKey in partitionKeysCurrent)
{
if (Convert.ToInt32(partitionKey) < Convert.ToInt32(partitionKeysNew[0]))
{
LogMessage($"Remove old partition {DateFormatPartitionKey(partitionKey, partitioningConfiguration.Granularity)}", MessageType.Informational, true);
table.Partitions.Remove(partitionKey);
}
}
//Process partitions
foreach (string partitionKey in partitionKeysForProcessing)
{
Partition partitionToProcess = table.Partitions.Find(partitionKey);
if (partitionToProcess == null)
{
partitionToProcess = CreateNewPartition(table, templatePartition, partitioningConfiguration, partitionKey, partitioningConfiguration.Granularity);
LogMessage($"Create new partition {DateFormatPartitionKey(partitionKey, partitioningConfiguration.Granularity)}", MessageType.Informational, true);
if (!_modelConfiguration.InitialSetUp)
{
IncrementalProcessPartition(partitionKey, partitionToProcess, partitioningConfiguration.Granularity);
}
}
else if (!_modelConfiguration.InitialSetUp)
{
//Existing partition for processing
IncrementalProcessPartition(partitionKey, partitionToProcess, partitioningConfiguration.Granularity);
}
if (_modelConfiguration.InitialSetUp)
{
if (partitionToProcess.State != ObjectState.Ready)
{
//Process new partitions sequentially during initial setup so don't run out of memory
LogMessage($"Sequentially process {DateFormatPartitionKey(partitionKey, partitioningConfiguration.Granularity)} /DataOnly", MessageType.Informational, true);
partitionToProcess.RequestRefresh(RefreshType.DataOnly);
database.Model.SaveChanges();
}
else
{
//Partition already exists during initial setup (and is fully processed), so ignore it
LogMessage($"Partition {DateFormatPartitionKey(partitionKey, partitioningConfiguration.Granularity)} already exists and is processed", MessageType.Informational, true);
}
}
}
}
//Ensure template partition doesn't contain any data
if (_modelConfiguration.InitialSetUp)
{
string beginParam = GetDateKey("19010102", Granularity.Daily, tableConfiguration.PartitioningConfigurations[0].IntegerDateKey, false, templatePartition.Source is MPartitionSource);
string endParam = GetDateKey("19010101", Granularity.Daily, tableConfiguration.PartitioningConfigurations[0].IntegerDateKey, false, templatePartition.Source is MPartitionSource);
//Query generated will always return nothing
string query = tableConfiguration.PartitioningConfigurations[0].TemplateSourceQuery.Replace("{0}", beginParam).Replace("{1}", endParam);
switch (templatePartition.Source)
{
case MPartitionSource mSource:
mSource.Expression = query;
break;
case QueryPartitionSource querySource:
querySource.Query = query;
break;
}
templatePartition.RequestRefresh(RefreshType.DataOnly);
}
}
}
//Commit the data changes, and bring model back online if necessary
LogMessage("", MessageType.Informational, false);
LogMessage("Final operations", MessageType.Informational, false);
LogMessage(new String('-', 16), MessageType.Informational, false);
//Save changes setting MaxParallelism if necessary
if (_modelConfiguration.MaxParallelism == -1)
{
LogMessage("Save changes ...", MessageType.Informational, true);
database.Model.SaveChanges();
}
else
{
LogMessage($"Save changes with MaxParallelism={Convert.ToString(_modelConfiguration.MaxParallelism)}...", MessageType.Informational, true);
database.Model.SaveChanges(new SaveOptions() { MaxParallelism = _modelConfiguration.MaxParallelism });
}
//Perform recalc if necessary
if (_modelConfiguration.InitialSetUp || (!_modelConfiguration.InitialSetUp && !_modelConfiguration.IncrementalOnline))
{
LogMessage("Recalc model to bring back online ...", MessageType.Informational, true);
database.Model.RequestRefresh(RefreshType.Calculate);
database.Model.SaveChanges();
}
Console.ForegroundColor = ConsoleColor.White;
LogMessage("", MessageType.Informational, false);
LogMessage("Finish: " + DateTime.Now.ToString("hh:mm:ss tt"), MessageType.Informational, false);
}
catch (Exception exc)
{
Console.ForegroundColor = ConsoleColor.Red;
LogMessage("", MessageType.Informational, false);
LogMessage($"Exception occurred: {DateTime.Now.ToString("hh:mm:ss tt")}", MessageType.Error, false);
LogMessage($"Exception message: {exc.Message}", MessageType.Error, false);
if (exc.InnerException != null)
{
LogMessage($"Inner exception message: {exc.InnerException.Message}", MessageType.Error, false);
}
LogMessage("", MessageType.Informational, false);
Console.ForegroundColor = ConsoleColor.White;
//Auto retry
if (_retryAttempts > 0)
{
LogMessage($"Retry attempts remaining: {Convert.ToString(_retryAttempts)}. Will wait {Convert.ToString(_modelConfiguration.RetryWaitTimeSeconds)} seconds and then attempt retry.", MessageType.Informational, false);
_retryAttempts -= 1;
Thread.Sleep(_modelConfiguration.RetryWaitTimeSeconds * 1000);
PerformProcessing();
}
}
finally
{
try
{
_modelConfiguration = null;
_messageLogger = null;
if (server != null) server.Disconnect();
}
catch { }
}
}