SamplesV1/ADFSecurePublish/AdfKeyVaultDeployment/AdfDeploy.cs (263 lines of code) (raw):
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Hyak.Common;
using Microsoft.Azure;
using Microsoft.Azure.Management.DataFactories;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.ADF.Deployment.AdfKeyVaultDeployment.Models;
using Newtonsoft.Json.Linq;
namespace Microsoft.ADF.Deployment.AdfKeyVaultDeployment
{
public class AdfDeploy
{
private string resourceManagerEndpoint = "https://management.azure.com/";
private string resourceGroupName;
private string dataFactoryName;
private IAdfFileHelper adfFileHelper;
private ILogger logger;
private IBlobUtilities blob;
private SettingsContext settingsContext;
public AdfDeploy(IAdfFileHelper adfFileHelper, ILogger logger, IBlobUtilities blob, SettingsContext settingsContext, string resourceGroupName, string dataFactoryName)
{
this.logger = logger;
this.blob = blob;
this.adfFileHelper = adfFileHelper;
this.resourceGroupName = resourceGroupName;
this.dataFactoryName = dataFactoryName;
this.settingsContext = settingsContext;
}
/// <summary>
/// Deploys the specified ADF files and custom activity packages to Azure.
/// </summary>
/// <param name="filesToProcess">The files to process.</param>
/// <param name="outputFolder">The output folder which contains the files and custom activity zips.</param>
/// <param name="deployConfig">The deployment configuration information.</param>
public async Task<bool> Deploy(List<string> filesToProcess, string outputFolder, DeployConfigInfo deployConfig = null)
{
bool result = true;
logger.Write(string.Empty);
logger.Write($"Getting all ADF resources to deploy to Azure from output folder '{outputFolder}'", "Black");
List<Task<AdfFileInfo>> allFilesTasks = filesToProcess.Select(async x => await adfFileHelper.GetFileInfo(x, deployConfig)).ToList();
List<AdfFileInfo> allFiles = new List<AdfFileInfo>();
foreach (var allFilesTask in allFilesTasks)
{
allFiles.Add(await allFilesTask);
}
List<AdfFileInfo> validFiles = allFiles.Where(x => x.IsValid).ToList();
if (!validFiles.Any())
{
logger.Write($"No valid ADF files found in '{outputFolder}'", "Red");
return false;
}
logger.Write($"{validFiles.Count} file{(validFiles.Count == 1 ? string.Empty : "s")} retreived");
logger.Write(string.Empty);
logger.Write($"Begin deploying ADF resources to '{dataFactoryName}'", "Black");
// Log invalid files
List<AdfFileInfo> invalidFiles = allFiles.Where(x => !x.IsValid).ToList();
if (invalidFiles.Any())
{
logger.Write("The following files found in the output folder will not be published:");
foreach (AdfFileInfo invalidFile in invalidFiles)
{
logger.Write(invalidFile.FileName);
}
}
DataFactoryManagementClient client = GetDataFactoryManagementClient();
// Deploy Package Zips before deploying ADF JSON files
List<AdfFileInfo> pipelines = validFiles.Where(x => x.FileType == FileType.Pipeline).ToList();
List<CustomActivityPackageInfo> packages = pipelines.SelectMany(x => x.CustomActivityPackages).Distinct().ToList();
if (packages.Any())
{
result &= await DeployCustomActivities(packages, validFiles, outputFolder);
}
List<AdfFileInfo> linkedServices = validFiles.Where(x => x.FileType == FileType.LinkedService).ToList();
if (linkedServices.Any())
{
logger.Write(string.Empty);
logger.Write("Deploying LinkedServices", "Black");
// Deploy non batch linked services first
var linkedServiceTaskDict = new Dictionary<string, Task<LinkedServiceCreateOrUpdateResponse>>();
foreach (var linkedService in linkedServices.Where(x => !x.SubType.Equals("AzureBatch", StringComparison.InvariantCultureIgnoreCase)))
{
linkedServiceTaskDict.Add(linkedService.Name,
client.LinkedServices.CreateOrUpdateWithRawJsonContentAsync(resourceGroupName,
dataFactoryName, linkedService.Name,
new LinkedServiceCreateOrUpdateWithRawJsonContentParameters(linkedService.FileContents)));
}
foreach (var item in linkedServiceTaskDict)
{
try
{
LinkedServiceCreateOrUpdateResponse response = await item.Value;
if (response.StatusCode == HttpStatusCode.OK)
{
logger.Write($"Linked service '{response.LinkedService.Name}' uploaded successfully.", "Green");
}
else
{
logger.Write($"Linked service '{response.LinkedService.Name}' did not upload successfully. Response status: {response.Status}", "Red");
result = false;
}
}
catch (Exception e)
{
logger.Write($"Linked service '{item.Key}' did not upload successfully. Error: {e.Message}", "Red");
logger.WriteError(e);
result = false;
}
}
// Deploy batch linked services next
var batchLinkedServiceTaskDict = new Dictionary<string, Task<LinkedServiceCreateOrUpdateResponse>>();
foreach (var batchLinkedService in linkedServices.Where(x => x.SubType.Equals("AzureBatch", StringComparison.InvariantCultureIgnoreCase)))
{
batchLinkedServiceTaskDict.Add(batchLinkedService.Name,
client.LinkedServices.CreateOrUpdateWithRawJsonContentAsync(resourceGroupName,
dataFactoryName, batchLinkedService.Name,
new LinkedServiceCreateOrUpdateWithRawJsonContentParameters(batchLinkedService.FileContents)));
}
foreach (var item in batchLinkedServiceTaskDict)
{
try
{
LinkedServiceCreateOrUpdateResponse response = await item.Value;
if (response.StatusCode == HttpStatusCode.OK)
{
logger.Write($"Linked service '{response.LinkedService.Name}' uploaded successfully.", "Green");
}
else
{
logger.Write($"Linked service '{response.LinkedService.Name}' did not upload successfully. Response status: {response.Status}", "Red");
result = false;
}
}
catch (Exception e)
{
logger.Write($"Linked service '{item.Key}' did not upload successfully. Error: {e.Message}", "Red");
logger.WriteError(e);
result = false;
}
}
}
List<AdfFileInfo> tables = validFiles.Where(x => x.FileType == FileType.Table).ToList();
if (tables.Any())
{
logger.Write(string.Empty);
logger.Write("Deploying tables", "Black");
// Deploy tables next
var tableTaskDict = new Dictionary<string, Task<DatasetCreateOrUpdateResponse>>();
foreach (AdfFileInfo adfJsonFile in tables)
{
try
{
Task<DatasetCreateOrUpdateResponse> tableTask = client.Datasets.CreateOrUpdateWithRawJsonContentAsync(resourceGroupName, dataFactoryName,
adfJsonFile.Name, new DatasetCreateOrUpdateWithRawJsonContentParameters(adfJsonFile.FileContents));
tableTaskDict.Add(adfJsonFile.Name, tableTask);
}
catch (Exception e)
{
logger.Write($"An error occurred uploading table '{adfJsonFile.Name}': " + e.Message, "Red");
logger.WriteError(e);
result = false;
}
}
foreach (var task in tableTaskDict)
{
try
{
DatasetCreateOrUpdateResponse response = await task.Value;
if (response.StatusCode == HttpStatusCode.OK)
{
logger.Write($"Table '{task.Key}' uploaded successfully.", "Green");
}
else
{
logger.Write(
$"Table '{task.Key}' did not upload successfully. Response status: {response.Status}",
"Red");
result = false;
}
}
catch (CloudException ex)
{
if (ex.Error.Code == "TableAvailabilityUpdateNotSupported")
{
logger.Write($"It looks like you are trying to change the availability for the Table '{task.Key}'. Currently this is not supported by ADF so as work around you should delete the dataset and related pipleline in the Data Factory '{dataFactoryName}' and run the publish again.", "Red");
}
else
{
logger.Write($"Table '{task.Key}' did not upload successfully. An error occurred: " + ex.Message, "Red");
}
logger.WriteError(ex);
result = false;
}
catch (Exception ex)
{
logger.WriteError(ex);
logger.Write($"Table '{task.Key}' did not upload successfully. An error occurred: " + ex.Message, "Red");
result = false;
}
}
}
if (pipelines.Any())
{
logger.Write(string.Empty);
logger.Write("Deploying pipelines", "Black");
// Deploy pipelines last
var pipelineTaskDict = new Dictionary<string, Task<PipelineCreateOrUpdateResponse>>();
foreach (AdfFileInfo adfJsonFile in pipelines)
{
pipelineTaskDict.Add(adfJsonFile.Name, client.Pipelines.CreateOrUpdateWithRawJsonContentAsync(resourceGroupName, dataFactoryName,
adfJsonFile.Name, new PipelineCreateOrUpdateWithRawJsonContentParameters(adfJsonFile.FileContents)));
}
foreach (var item in pipelineTaskDict)
{
try
{
PipelineCreateOrUpdateResponse response = await item.Value;
if (response.StatusCode == HttpStatusCode.OK)
{
logger.Write($"Pipeline '{response.Pipeline.Name}' uploaded successfully.", "Green");
}
else
{
logger.Write($"Pipeline '{response.Pipeline.Name}' did not upload successfully. Response status: {response.Status}", "Red");
result = false;
}
}
catch (Exception e)
{
logger.WriteError(e);
logger.Write($"An error occurred uploading pipeline '{item.Key}': " + e.Message, "Red");
result = false;
}
}
}
return result;
}
/// <summary>
/// Deploys the custom activities.
/// </summary>
/// <param name="packages">The packages.</param>
/// <param name="allFiles">All files.</param>
/// <param name="outputFolder">The output folder.</param>
private async Task<bool> DeployCustomActivities(List<CustomActivityPackageInfo> packages, List<AdfFileInfo> allFiles, string outputFolder)
{
logger.Write(string.Empty);
logger.Write("Deploying custom activity packages to blob storage", "Black");
var tasks = new List<Task<bool>>();
foreach (CustomActivityPackageInfo package in packages)
{
// Get connection string for blob account to upload to
JObject jObject = allFiles.FirstOrDefault(x => x.Name == package.PackageLinkedService)?.JObject;
var parts = package.PackageFile.Split('/');
if (parts.Length != 2)
{
throw new Exception("packageFile should have only one '/' in it. Current packageFile value: " + package.PackageFile);
}
string connectionString = jObject?.SelectToken("$.properties.typeProperties.connectionString").ToObject<string>();
string localFilePath = Path.Combine(outputFolder, parts[1]);
string blobFolder = parts[0];
tasks.Add(blob.UploadFile(localFilePath, connectionString, blobFolder));
}
var packageUploadResults = await Task.WhenAll(tasks);
return packageUploadResults.All(x => x);
}
/// <summary>
/// Gets the data factory management client.
/// </summary>
private DataFactoryManagementClient GetDataFactoryManagementClient()
{
TokenCloudCredentials aadTokenCredentials =
new TokenCloudCredentials(settingsContext.SubscriptionId,
AzureAccessUtilities.GetAuthorizationHeaderNoPopup(settingsContext));
Uri resourceManagerUri = new Uri(resourceManagerEndpoint);
DataFactoryManagementClient client = new DataFactoryManagementClient(aadTokenCredentials, resourceManagerUri);
return client;
}
}
}