SamplesV1/AzureAnalysisServicesProcessSample/ProcessAzureASActivity.cs (193 lines of code) (raw):
namespace AzureAnalysisServicesProcessSample
{
using Microsoft.AnalysisServices.Tabular;
using Microsoft.AnalysisServices.AdomdClient;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.Azure.Management.DataFactories.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using System.Globalization;
/// <summary>
/// Custom activity to process a Tabular model.
/// </summary>
public class ProcessAzureASActivity : CrossAppDomainDotNetActivity<ProcessAzureASContext>
{
/// <summary>
/// Names of the parameters used in the Activity JSON.
/// </summary>
const string TABULAR_DATABASE_NAME_PARAMETER_NAME = "TabularDatabaseName";
const string AZUREAS_CONNECTION_STRING_PARAMETER_NAME = "AzureASConnectionString";
const string ADV_AS_PROCESS_SCRIPT_PATH_PARAMETER_NAME = "AdvancedASProcessingScriptPath";
const string AZUREAD_AUTHORITY_PARAMETER_NAME = "AzureADAuthority";
const string AZUREAD_RESOURCE_PARAMETER_NAME = "AzureADResource";
const string AZUREAD_CLIENTID_PARAMETER_NAME = "AzureADClientId";
const string AZUREAD_CLIENTSECRETPATH_PARAMETER_NAME = "AzureADClientSecretPath";
internal override ProcessAzureASContext PreExecute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
ValidateParameters(linkedServices, datasets, activity, logger);
return CreateContext(linkedServices, datasets, activity, logger);
}
public override IDictionary<string, string> Execute(ProcessAzureASContext context, IActivityLogger logger)
{
logger.Write("Starting ProcessAzureASActivity");
if (string.IsNullOrEmpty(context.AdvancedASProcessingScriptPath))
{
logger.Write("No custom TMSL script specified, process perform full process of the database");
try
{
Model tabularModel = GetTabularModel(context.AzureASConnectionString, context.TabularDatabaseName);
ProcessTabularModel(tabularModel, logger);
logger.Write("Finalizing ProcessAzureASActivity");
}
catch (Exception ex)
{
logger.Write(ex.Message);
throw;
}
}
else
{
logger.Write("Custom TMSL script specified, perform action defined in TMSL script");
try
{
using (AdomdConnection asConn = new AdomdConnection(context.AzureASConnectionString))
{
asConn.Open();
foreach (string scriptPath in context.AdvancedASProcessingScriptPath.Split(';'))
{
string commandText = ReadBlob(context.BlobStorageConnectionString, scriptPath);
AdomdCommand asCmd = asConn.CreateCommand();
asCmd.CommandText = commandText;
asCmd.ExecuteNonQuery();
logger.Write("Azure AS was successfully processed");
}
}
}
catch (Exception ex)
{
logger.Write(ex.Message);
throw;
}
}
return new Dictionary<string, string>();
}
internal virtual Model GetTabularModel(string aasConnectionString, string tabularDatabaseName)
{
var analysisServicesServer = new Server();
analysisServicesServer.Connect(aasConnectionString);
var tabularDatabase = analysisServicesServer.Databases.FindByName(tabularDatabaseName);
if (tabularDatabase == null) {
throw new ArgumentException("Database not found", tabularDatabaseName);
}
return tabularDatabase.Model;
}
internal virtual void ProcessTabularModel(Model tabularModel, IActivityLogger logger)
{
// We request a refresh for all tables
foreach (var table in tabularModel.Tables)
{
// For partition tables, we process each partition.
if (table.Partitions.Any())
{
logger.Write("Table {0} will be processed partition by partition", table.Name);
foreach (var partition in table.Partitions)
{
partition.RequestRefresh(RefreshType.Full);
}
}
else
{
logger.Write("Table {0} will be processed in full mode", table.Name);
table.RequestRefresh(RefreshType.Full);
}
}
logger.Write("Azure AS processing started");
tabularModel.SaveChanges();
logger.Write("Azure AS was successfully processed");
}
private void ValidateParameters(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets,
Activity activity, IActivityLogger logger)
{
if (linkedServices == null) throw new ArgumentNullException("linkedServices");
if (datasets == null) throw new ArgumentNullException("datasets");
if (activity == null) throw new ArgumentNullException("activity");
if (logger == null) throw new ArgumentNullException("logger");
// Verify datasets
if (activity.Outputs.Count != 1) throw new ArgumentException("Only one output datasets is required, as a dummy");
foreach (LinkedService ls in linkedServices)
logger.Write("Detected linkedService.Name {0}", ls.Name);
DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
// Ensure required parameters are included
if (!dotNetActivity.ExtendedProperties.ContainsKey(ADV_AS_PROCESS_SCRIPT_PATH_PARAMETER_NAME)) {
if (!dotNetActivity.ExtendedProperties.ContainsKey(TABULAR_DATABASE_NAME_PARAMETER_NAME)) throw new ArgumentException(TABULAR_DATABASE_NAME_PARAMETER_NAME);
}
if (!dotNetActivity.ExtendedProperties.ContainsKey(AZUREAS_CONNECTION_STRING_PARAMETER_NAME)) throw new ArgumentException(AZUREAS_CONNECTION_STRING_PARAMETER_NAME);
logger.Write("Parameters validated");
}
private ProcessAzureASContext CreateContext(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
//Get Azure Storage Linked Service Connection String from the dummy output dataset,
//AS processing does not produce output dataset, so we use this to access the TMSL script for AS processing
AzureStorageLinkedService outputLinkedService;
Dataset outputDataset = datasets.Single(dataset => dataset.Name == activity.Outputs.Single().Name);
AzureBlobDataset outputTypeProperties;
outputTypeProperties = outputDataset.Properties.TypeProperties as AzureBlobDataset;
// get the Azure Storate linked service from linkedServices object
outputLinkedService = linkedServices.First(
linkedService =>
linkedService.Name ==
outputDataset.Properties.LinkedServiceName).Properties.TypeProperties
as AzureStorageLinkedService;
// get the connection string in the linked service
string blobconnectionString = outputLinkedService.ConnectionString;
DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
var tabularDatabaseName = dotNetActivity.ExtendedProperties[TABULAR_DATABASE_NAME_PARAMETER_NAME];
var aasConnectionString = dotNetActivity.ExtendedProperties[AZUREAS_CONNECTION_STRING_PARAMETER_NAME];
var advASProcessingScriptPath="";
if (dotNetActivity.ExtendedProperties.ContainsKey(ADV_AS_PROCESS_SCRIPT_PATH_PARAMETER_NAME))
{
advASProcessingScriptPath = dotNetActivity.ExtendedProperties[ADV_AS_PROCESS_SCRIPT_PATH_PARAMETER_NAME];
}
if (dotNetActivity.ExtendedProperties.ContainsKey(AZUREAD_AUTHORITY_PARAMETER_NAME))
{
aasConnectionString = GetAzureADToken(blobconnectionString, dotNetActivity, aasConnectionString);
}
return new ProcessAzureASContext
{
TabularDatabaseName = tabularDatabaseName,
AzureASConnectionString = aasConnectionString,
AdvancedASProcessingScriptPath= advASProcessingScriptPath,
BlobStorageConnectionString= blobconnectionString
};
}
private static string GetAzureADToken(string blobConnectionString, DotNetActivity dotNetActivity, string aasConnectionString)
{
string authority = dotNetActivity.ExtendedProperties[AZUREAD_AUTHORITY_PARAMETER_NAME];
string resource = dotNetActivity.ExtendedProperties[AZUREAD_RESOURCE_PARAMETER_NAME];
string clientId = dotNetActivity.ExtendedProperties[AZUREAD_CLIENTID_PARAMETER_NAME];
string clientSecretPath = dotNetActivity.ExtendedProperties[AZUREAD_CLIENTSECRETPATH_PARAMETER_NAME];
string clientSecret = ReadBlob(blobConnectionString, clientSecretPath);
AuthenticationContext authContext = new AuthenticationContext(authority);
ClientCredential cc = new ClientCredential(clientId, clientSecret);
var task = authContext.AcquireTokenAsync(resource, cc);
task.Wait();
AuthenticationResult token = task.Result;
aasConnectionString = string.Format(CultureInfo.InvariantCulture, aasConnectionString, token.AccessToken);
return aasConnectionString;
}
private static string ReadBlob(string blobConnectionString, string blobPath)
{
string[] pathArr = blobPath.Split("/".ToCharArray(), 2);
if (pathArr.Count() < 2)
{
throw new ArgumentException("Missing container name", ADV_AS_PROCESS_SCRIPT_PATH_PARAMETER_NAME);
}
string container = pathArr.First();
string filepath = pathArr.Last();
CloudStorageAccount inputStorageAccount = CloudStorageAccount.Parse(blobConnectionString);
CloudBlobClient inputClient = inputStorageAccount.CreateCloudBlobClient();
CloudBlobContainer inputContainer = inputClient.GetContainerReference(container);
CloudBlockBlob blockBlob = inputContainer.GetBlockBlobReference(filepath);
string CmdStr;
using (var memoryStream = new MemoryStream())
{
blockBlob.DownloadToStream(memoryStream);
memoryStream.Position = 0;
StreamReader CmdReader = new StreamReader(memoryStream);
CmdStr = CmdReader.ReadToEnd();
}
return CmdStr;
}
}
}