in SamplesV1/ADFSecurePublish/AdfKeyVaultDeployment/AdfDeploy.cs [46:280]
public async Task<bool> Deploy(List<string> filesToProcess, string outputFolder, DeployConfigInfo deployConfig = null)
{
bool result = true;
logger.Write(string.Empty);
logger.Write($"Getting all ADF resources to deploy to Azure from output folder '{outputFolder}'", "Black");
List<Task<AdfFileInfo>> allFilesTasks = filesToProcess.Select(async x => await adfFileHelper.GetFileInfo(x, deployConfig)).ToList();
List<AdfFileInfo> allFiles = new List<AdfFileInfo>();
foreach (var allFilesTask in allFilesTasks)
{
allFiles.Add(await allFilesTask);
}
List<AdfFileInfo> validFiles = allFiles.Where(x => x.IsValid).ToList();
if (!validFiles.Any())
{
logger.Write($"No valid ADF files found in '{outputFolder}'", "Red");
return false;
}
logger.Write($"{validFiles.Count} file{(validFiles.Count == 1 ? string.Empty : "s")} retreived");
logger.Write(string.Empty);
logger.Write($"Begin deploying ADF resources to '{dataFactoryName}'", "Black");
// Log invalid files
List<AdfFileInfo> invalidFiles = allFiles.Where(x => !x.IsValid).ToList();
if (invalidFiles.Any())
{
logger.Write("The following files found in the output folder will not be published:");
foreach (AdfFileInfo invalidFile in invalidFiles)
{
logger.Write(invalidFile.FileName);
}
}
DataFactoryManagementClient client = GetDataFactoryManagementClient();
// Deploy Package Zips before deploying ADF JSON files
List<AdfFileInfo> pipelines = validFiles.Where(x => x.FileType == FileType.Pipeline).ToList();
List<CustomActivityPackageInfo> packages = pipelines.SelectMany(x => x.CustomActivityPackages).Distinct().ToList();
if (packages.Any())
{
result &= await DeployCustomActivities(packages, validFiles, outputFolder);
}
List<AdfFileInfo> linkedServices = validFiles.Where(x => x.FileType == FileType.LinkedService).ToList();
if (linkedServices.Any())
{
logger.Write(string.Empty);
logger.Write("Deploying LinkedServices", "Black");
// Deploy non batch linked services first
var linkedServiceTaskDict = new Dictionary<string, Task<LinkedServiceCreateOrUpdateResponse>>();
foreach (var linkedService in linkedServices.Where(x => !x.SubType.Equals("AzureBatch", StringComparison.InvariantCultureIgnoreCase)))
{
linkedServiceTaskDict.Add(linkedService.Name,
client.LinkedServices.CreateOrUpdateWithRawJsonContentAsync(resourceGroupName,
dataFactoryName, linkedService.Name,
new LinkedServiceCreateOrUpdateWithRawJsonContentParameters(linkedService.FileContents)));
}
foreach (var item in linkedServiceTaskDict)
{
try
{
LinkedServiceCreateOrUpdateResponse response = await item.Value;
if (response.StatusCode == HttpStatusCode.OK)
{
logger.Write($"Linked service '{response.LinkedService.Name}' uploaded successfully.", "Green");
}
else
{
logger.Write($"Linked service '{response.LinkedService.Name}' did not upload successfully. Response status: {response.Status}", "Red");
result = false;
}
}
catch (Exception e)
{
logger.Write($"Linked service '{item.Key}' did not upload successfully. Error: {e.Message}", "Red");
logger.WriteError(e);
result = false;
}
}
// Deploy batch linked services next
var batchLinkedServiceTaskDict = new Dictionary<string, Task<LinkedServiceCreateOrUpdateResponse>>();
foreach (var batchLinkedService in linkedServices.Where(x => x.SubType.Equals("AzureBatch", StringComparison.InvariantCultureIgnoreCase)))
{
batchLinkedServiceTaskDict.Add(batchLinkedService.Name,
client.LinkedServices.CreateOrUpdateWithRawJsonContentAsync(resourceGroupName,
dataFactoryName, batchLinkedService.Name,
new LinkedServiceCreateOrUpdateWithRawJsonContentParameters(batchLinkedService.FileContents)));
}
foreach (var item in batchLinkedServiceTaskDict)
{
try
{
LinkedServiceCreateOrUpdateResponse response = await item.Value;
if (response.StatusCode == HttpStatusCode.OK)
{
logger.Write($"Linked service '{response.LinkedService.Name}' uploaded successfully.", "Green");
}
else
{
logger.Write($"Linked service '{response.LinkedService.Name}' did not upload successfully. Response status: {response.Status}", "Red");
result = false;
}
}
catch (Exception e)
{
logger.Write($"Linked service '{item.Key}' did not upload successfully. Error: {e.Message}", "Red");
logger.WriteError(e);
result = false;
}
}
}
List<AdfFileInfo> tables = validFiles.Where(x => x.FileType == FileType.Table).ToList();
if (tables.Any())
{
logger.Write(string.Empty);
logger.Write("Deploying tables", "Black");
// Deploy tables next
var tableTaskDict = new Dictionary<string, Task<DatasetCreateOrUpdateResponse>>();
foreach (AdfFileInfo adfJsonFile in tables)
{
try
{
Task<DatasetCreateOrUpdateResponse> tableTask = client.Datasets.CreateOrUpdateWithRawJsonContentAsync(resourceGroupName, dataFactoryName,
adfJsonFile.Name, new DatasetCreateOrUpdateWithRawJsonContentParameters(adfJsonFile.FileContents));
tableTaskDict.Add(adfJsonFile.Name, tableTask);
}
catch (Exception e)
{
logger.Write($"An error occurred uploading table '{adfJsonFile.Name}': " + e.Message, "Red");
logger.WriteError(e);
result = false;
}
}
foreach (var task in tableTaskDict)
{
try
{
DatasetCreateOrUpdateResponse response = await task.Value;
if (response.StatusCode == HttpStatusCode.OK)
{
logger.Write($"Table '{task.Key}' uploaded successfully.", "Green");
}
else
{
logger.Write(
$"Table '{task.Key}' did not upload successfully. Response status: {response.Status}",
"Red");
result = false;
}
}
catch (CloudException ex)
{
if (ex.Error.Code == "TableAvailabilityUpdateNotSupported")
{
logger.Write($"It looks like you are trying to change the availability for the Table '{task.Key}'. Currently this is not supported by ADF so as work around you should delete the dataset and related pipleline in the Data Factory '{dataFactoryName}' and run the publish again.", "Red");
}
else
{
logger.Write($"Table '{task.Key}' did not upload successfully. An error occurred: " + ex.Message, "Red");
}
logger.WriteError(ex);
result = false;
}
catch (Exception ex)
{
logger.WriteError(ex);
logger.Write($"Table '{task.Key}' did not upload successfully. An error occurred: " + ex.Message, "Red");
result = false;
}
}
}
if (pipelines.Any())
{
logger.Write(string.Empty);
logger.Write("Deploying pipelines", "Black");
// Deploy pipelines last
var pipelineTaskDict = new Dictionary<string, Task<PipelineCreateOrUpdateResponse>>();
foreach (AdfFileInfo adfJsonFile in pipelines)
{
pipelineTaskDict.Add(adfJsonFile.Name, client.Pipelines.CreateOrUpdateWithRawJsonContentAsync(resourceGroupName, dataFactoryName,
adfJsonFile.Name, new PipelineCreateOrUpdateWithRawJsonContentParameters(adfJsonFile.FileContents)));
}
foreach (var item in pipelineTaskDict)
{
try
{
PipelineCreateOrUpdateResponse response = await item.Value;
if (response.StatusCode == HttpStatusCode.OK)
{
logger.Write($"Pipeline '{response.Pipeline.Name}' uploaded successfully.", "Green");
}
else
{
logger.Write($"Pipeline '{response.Pipeline.Name}' did not upload successfully. Response status: {response.Status}", "Red");
result = false;
}
}
catch (Exception e)
{
logger.WriteError(e);
logger.Write($"An error occurred uploading pipeline '{item.Key}': " + e.Message, "Red");
result = false;
}
}
}
return result;
}