in code/Server/DataFactory/UpdateAppsTable.cs [67:200]
public IDictionary<string, string> Execute(
IEnumerable<LinkedService> linkedServices,
IEnumerable<Dataset> datasets,
Activity activity,
IActivityLogger logger)
{
DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
IDictionary<string, string> extendedProperties = dotNetActivity.ExtendedProperties;
logger.Write("Logging extended properties if any...");
foreach (KeyValuePair<string, string> entry in extendedProperties)
{
logger.Write("<key:{0}> <value:{1}>", entry.Key, entry.Value);
}
if (!extendedProperties.ContainsKey("columnName"))
{
throw new ArgumentException("Column name is required", "columnName");
}
string columnName = extendedProperties["columnName"];
if (!extendedProperties.ContainsKey("columnType"))
{
throw new ArgumentException("Column Type information is required", "columnType");
}
string columnType = extendedProperties["columnType"];
// Note that partitionKeyOwnerValueRule is required as the rules for updating value comes from it
// We do not update column value with default value if the matching rule is not found. The record is ignored. All rules need to be explicitly specified
if (!extendedProperties.ContainsKey("partitionKeyOwnerValueRule"))
{
throw new ArgumentException("PartitionKeyOwnerValueRule information is required", "partitionKeyOwnerValueRule");
}
string partitionKeyOwnerValueRule = extendedProperties["partitionKeyOwnerValueRule"];
string[] rowKeyPrefixes = null;
if (extendedProperties.ContainsKey("rowKeyPrefixes"))
{
rowKeyPrefixes = extendedProperties["rowKeyPrefixes"].Split(',');
}
var partitionKeyOwnerValueRuleDict = partitionKeyOwnerValueRule.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries)
.Select(part => part.Split('='))
.ToDictionary(split => split[0], split => split[1]);
var appHandles = ownerAppHandles.Where(item => partitionKeyOwnerValueRuleDict.ContainsKey(item.Key)).SelectMany(item => item.Value).ToList();
logger.Write("Matching appHandles:{0}", string.Join(",", appHandles));
AzureStorageLinkedService inputLinkedService;
AzureTableDataset sourceTable;
// For activities working on a single dataset, the first entry is the input dataset.
// The activity.Inputs can have multiple datasets for building pipeline workflow dependencies. We can ignore the rest of the datasets
Dataset inputDataset = datasets.Single(dataset => dataset.Name == activity.Inputs.First().Name);
sourceTable = inputDataset.Properties.TypeProperties as AzureTableDataset;
logger.Write("input table:{0}", sourceTable.TableName);
inputLinkedService = linkedServices.First(
ls =>
ls.Name ==
inputDataset.Properties.LinkedServiceName).Properties.TypeProperties
as AzureStorageLinkedService;
string inputConnectionString = inputLinkedService.ConnectionString;
// create storage client for input. Pass the connection string.
CloudStorageAccount inputStorageAccount = CloudStorageAccount.Parse(inputConnectionString);
CloudTableClient inputTableClient = inputStorageAccount.CreateCloudTableClient();
CloudTable inputTable = inputTableClient.GetTableReference(sourceTable.TableName);
long totalProcessedRecords = 0;
long actualAffectedRecords = 0;
TableContinuationToken tableContinuationToken = null;
List<Task> tasks = new List<Task>();
do
{
var resultSegment = inputTable.ExecuteQuerySegmented(new TableQuery(), tableContinuationToken);
tableContinuationToken = resultSegment.ContinuationToken;
var partitionGroups = (from s in resultSegment.Results
where (rowKeyPrefixes == null || rowKeyPrefixes.Length <= 0) ? true : this.IsMatch(s.RowKey, rowKeyPrefixes)
select s).GroupBy(a => a.PartitionKey);
foreach (IGrouping<string, DynamicTableEntity> g in partitionGroups)
{
TableBatchOperation batch = new TableBatchOperation();
foreach (DynamicTableEntity e in g.AsEnumerable())
{
// If appHandles do not contain the partition key, Continue
if (!appHandles.Contains(e.PartitionKey))
{
continue;
}
else
{
// Pick the value to be used for specified AppHandle
// This is done by getting the owber key first from e.PartitionKey
var ownerKey = ownerAppHandles.FirstOrDefault(x => x.Value.Contains(e.PartitionKey)).Key;
// The owner key is used to pick the value for the column
string newColumnValue = partitionKeyOwnerValueRuleDict[ownerKey];
if (this.ReplaceColumnValue(e, columnName, columnType, newColumnValue))
{
batch.Merge(e);
logger.Write("<partition key:{0}>, <row key:{1}>", e.PartitionKey, e.RowKey);
}
}
}
if (batch.Count > 0)
{
tasks.Add(inputTable.ExecuteBatchInChunkAsync(batch));
actualAffectedRecords += batch.Count;
}
logger.Write("Updated partition: {0}", g.Key);
}
totalProcessedRecords += resultSegment.Results.Count;
logger.Write("Processed records count: {0}", totalProcessedRecords);
logger.Write("Affected records count: {0}", actualAffectedRecords);
}
while (tableContinuationToken != null);
Task.WaitAll(tasks.ToArray());
logger.Write("Updated {0} records", actualAffectedRecords);
return new Dictionary<string, string>();
}