in code/Server/DataFactory/TransformColumn.cs [53:190]
public IDictionary<string, string> Execute(
IEnumerable<LinkedService> linkedServices,
IEnumerable<Dataset> datasets,
Microsoft.Azure.Management.DataFactories.Models.Activity activity,
IActivityLogger logger)
{
DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
IDictionary<string, string> extendedProperties = dotNetActivity.ExtendedProperties;
logger.Write("Logging extended properties if any...");
foreach (KeyValuePair<string, string> entry in extendedProperties)
{
logger.Write("<key:{0}> <value:{1}>", entry.Key, entry.Value);
}
string[] rowKeyPrefixes = null;
if (extendedProperties.ContainsKey("rowKeyPrefixes"))
{
rowKeyPrefixes = extendedProperties["rowKeyPrefixes"].Split(',');
}
bool hasColumnUpdate = false;
string columnName = string.Empty, columnType = string.Empty, ifColumnValueMatches = string.Empty, replaceColumnValueWith = string.Empty;
if (extendedProperties.ContainsKey("columnName"))
{
columnName = extendedProperties["columnName"];
columnType = extendedProperties["columnType"];
ifColumnValueMatches = extendedProperties["ifColumnValueMatches"];
replaceColumnValueWith = extendedProperties["replaceColumnValueWith"];
hasColumnUpdate = true;
}
bool hasRowKeyUpdate = false;
string ifRowKeyContains = string.Empty, replaceRowKeySubStrWith = string.Empty;
if (extendedProperties.ContainsKey("ifRowKeyContains"))
{
ifRowKeyContains = extendedProperties["ifRowKeyContains"];
replaceRowKeySubStrWith = extendedProperties["replaceRowKeySubStrWith"];
hasRowKeyUpdate = true;
}
AzureStorageLinkedService inputLinkedService;
AzureTableDataset sourceTable;
// For activities working on a single dataset, the first entry is the input dataset.
// The activity.Inputs can have multiple datasets for building pipeline workflow dependencies. We can ignore the rest of the datasets
Dataset inputDataset = datasets.Single(dataset => dataset.Name == activity.Inputs.First().Name);
sourceTable = inputDataset.Properties.TypeProperties as AzureTableDataset;
logger.Write("input table:{0}", sourceTable.TableName);
inputLinkedService = linkedServices.First(
ls =>
ls.Name ==
inputDataset.Properties.LinkedServiceName).Properties.TypeProperties
as AzureStorageLinkedService;
string inputConnectionString = inputLinkedService.ConnectionString;
// create storage client for input. Pass the connection string.
CloudStorageAccount inputStorageAccount = CloudStorageAccount.Parse(inputConnectionString);
CloudTableClient inputTableClient = inputStorageAccount.CreateCloudTableClient();
CloudTable inputTable = inputTableClient.GetTableReference(sourceTable.TableName);
long totalProcessedRecords = 0;
long actualAffectedRecords = 0;
TableContinuationToken tableContinuationToken = null;
List<Task> tasks = new List<Task>();
do
{
var resultSegment = inputTable.ExecuteQuerySegmented(new TableQuery(), tableContinuationToken);
tableContinuationToken = resultSegment.ContinuationToken;
var partitionGroups = (from s in resultSegment.Results
where (rowKeyPrefixes == null || rowKeyPrefixes.Length <= 0) ? true : this.IsMatch(s.RowKey, rowKeyPrefixes)
select s).GroupBy(a => a.PartitionKey);
foreach (IGrouping<string, DynamicTableEntity> g in partitionGroups)
{
TableBatchOperation batch = new TableBatchOperation();
foreach (DynamicTableEntity e in g.AsEnumerable())
{
string cachedRowkey = e.RowKey;
IDictionary<string, EntityProperty> cachedProperties = new Dictionary<string, EntityProperty>();
foreach (KeyValuePair<string, EntityProperty> p in e.Properties)
{
cachedProperties.Add(p);
}
bool recordUpdated = false, requiresDelete = false;
if (hasColumnUpdate)
{
recordUpdated = this.ReplaceIfMatch(e, columnName, columnType, ifColumnValueMatches, replaceColumnValueWith);
}
if (hasRowKeyUpdate && e.RowKey.Contains(ifRowKeyContains))
{
e.RowKey = e.RowKey.Replace(ifRowKeyContains, replaceRowKeySubStrWith);
recordUpdated = true;
requiresDelete = true;
}
if (recordUpdated)
{
if (!requiresDelete)
{
batch.Replace(e);
}
else
{
batch.Insert(e);
batch.Delete(new DynamicTableEntity(e.PartitionKey, cachedRowkey, "*", cachedProperties));
}
actualAffectedRecords++;
logger.Write("<partition key:{0}>, <row key:{1}> added to batch", e.PartitionKey, e.RowKey);
}
}
if (batch.Count > 0)
{
tasks.Add(inputTable.ExecuteBatchInChunkAsync(batch));
}
logger.Write("Updated partition: {0}", g.Key);
}
totalProcessedRecords += resultSegment.Results.Count;
logger.Write("Processed records count: {0}", totalProcessedRecords);
logger.Write("Affected records count: {0}", actualAffectedRecords);
}
while (tableContinuationToken != null);
Task.WaitAll(tasks.ToArray());
logger.Write("Updated {0} records", actualAffectedRecords);
return new Dictionary<string, string>();
}