in Source/Actions/Microsoft.Deployment.Actions.Salesforce/ADFDeployPipelines.cs [29:176]
public override async Task<ActionResponse> ExecuteActionAsync(ActionRequest request)
{
List<Task<ActionResponse>> task = new List<Task<ActionResponse>>();
var token = request.DataStore.GetJson("AzureToken", "access_token");
var subscription = request.DataStore.GetJson("SelectedSubscription", "SubscriptionId");
var resourceGroup = request.DataStore.GetValue("SelectedResourceGroup");
string connString = request.DataStore.GetValue("SqlConnectionString");
string schema = "dbo";
var coreObjects = request.DataStore.GetValue("ObjectTables").SplitByCommaSpaceTabReturnList();
string postDeploymentPipelineType = request.DataStore.GetValue("postDeploymentPipelineType");
string pipelineFrequency = request.DataStore.GetValue("pipelineFrequency");
string pipelineInterval = request.DataStore.GetValue("pipelineInterval");
string pipelineType = request.DataStore.GetValue("pipelineType");
string pipelineStart = request.DataStore.GetValue("pipelineStart");
string pipelineEnd = request.DataStore.GetValue("pipelineEnd");
bool historicalOnly = Convert.ToBoolean(request.DataStore.GetValue("historicalOnly"));
string dataFactoryName = resourceGroup.Replace("_", string.Empty) + "SalesforceCopyFactory";
if (!string.IsNullOrWhiteSpace(postDeploymentPipelineType))
{
pipelineFrequency = request.DataStore.GetValue("postDeploymentPipelineFrequency");
pipelineType = postDeploymentPipelineType;
pipelineInterval = request.DataStore.GetValue("postDeploymentPipelineInterval");
pipelineStart = DateTime.UtcNow.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture);
pipelineEnd = new DateTime(9999, 12, 31, 23, 59, 59).ToString("yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture);
}
if (string.IsNullOrWhiteSpace(pipelineStart))
{
pipelineStart = DateTime.UtcNow.AddYears(-3).ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture);
}
if (string.IsNullOrWhiteSpace(pipelineEnd))
{
pipelineEnd = DateTime.UtcNow.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture);
}
var adfJsonData = request.DataStore.GetValue("ADFPipelineJsonData");
var obj = JsonConvert.DeserializeObject(adfJsonData, typeof(DeserializedADFPayload)) as DeserializedADFPayload;
obj = ReorderObjects(obj);
for (int i = 0; i < obj.fields.Count(); i++)
{
var o = obj.fields[i];
string deploymentName = string.Concat("ADFPipeline", pipelineType, o.Item1);
var sqlCreds = SqlUtility.GetSqlCredentialsFromConnectionString(connString);
var param = new AzureArmParameterGenerator();
param.AddStringParam("dataFactoryName", dataFactoryName);
param.AddStringParam("targetSqlSchema", schema);
param.AddStringParam("targetSqlTable", o.Item1.ToLowerInvariant());
param.AddStringParam("targetSalesforceTable", o.Item1);
param.AddStringParam("pipelineName", o.Item1 + "_CopyPipeline");
param.AddStringParam("sqlWritableTypeName", o.Item1.ToLowerInvariant() + "type");
param.AddStringParam("sqlWriterStoredProcedureName", "spMerge" + o.Item1.ToLowerInvariant());
param.AddStringParam("pipelineStartDate", pipelineStart);
param.AddStringParam("pipelineEndDate", pipelineEnd);
param.AddStringParam("pipelineType", pipelineType);
param.AddStringParam("sliceFrequency", pipelineFrequency);
param.AddStringParam("sliceInterval", pipelineInterval);
var armTemplate = JsonUtility.GetJsonObjectFromJsonString(System.IO.File.ReadAllText(Path.Combine(request.Info.App.AppFilePath, "Service/ADF/pipeline.json")));
var armParamTemplate = JsonUtility.GetJObjectFromObject(param.GetDynamicObject());
armTemplate.Remove("parameters");
armTemplate.Add("parameters", armParamTemplate["parameters"]);
if (i >= 1)
{
armTemplate = CreatePipelineDependency(pipelineType, obj, armTemplate, i - 1);
}
string tableFields = JsonConvert.SerializeObject(o.Item2);
StringBuilder query;
if (o.Item1 != "Opportunity" &&
o.Item1 != "Lead" &&
o.Item1 != "OpportunityLineItem" &&
pipelineType == "PreDeployment" &&
coreObjects.Contains(o.Item1))
{
query = CreateQuery(o, tableFields, true, pipelineStart, pipelineEnd);
armTemplate = CreateOneTimePipeline(armTemplate);
}
else
{
query = CreateQuery(o, tableFields, false);
}
if (historicalOnly && pipelineType == "PostDeployment")
{
armTemplate = this.PausePipeline(armTemplate);
}
string stringTemplate = ReplaceTableFieldsAndQuery(tableFields, query, armTemplate);
SubscriptionCloudCredentials creds = new TokenCloudCredentials(subscription, token);
ResourceManagementClient client = new ResourceManagementClient(creds);
var deployment = new Microsoft.Azure.Management.Resources.Models.Deployment()
{
Properties = new DeploymentPropertiesExtended()
{
Template = stringTemplate,
Parameters = JsonUtility.GetEmptyJObject().ToString()
}
};
var validate = client.Deployments.ValidateAsync(resourceGroup, deploymentName, deployment, new CancellationToken()).Result;
if (!validate.IsValid)
{
return new ActionResponse(ActionStatus.Failure, JsonUtility.GetJObjectFromObject(validate), null,
DefaultErrorCodes.DefaultErrorCode, $"Azure:{validate.Error.Message} Details:{validate.Error.Details}");
}
task.Add(new Task<ActionResponse>(() =>
{
var deploymentItem = client.Deployments.CreateOrUpdateAsync(resourceGroup, deploymentName, deployment, new CancellationToken()).Result;
var helper = new DeploymentHelper();
return helper.WaitForDeployment(client, resourceGroup, deploymentName).Result;
}));
}
foreach (var t in task)
{
t.Start();
}
var results = Task.WhenAll(task.ToArray());
foreach (var t in results.Result)
{
if (t.Status != ActionStatus.Success)
{
return new ActionResponse(ActionStatus.Failure, t.ExceptionDetail.FriendlyErrorMessage);
}
}
return new ActionResponse(ActionStatus.Success, JsonUtility.GetJObjectFromStringValue(dataFactoryName));
}