public IDictionary Execute()

in code/Server/DataFactory/TransformColumn.cs [53:190]


        public IDictionary<string, string> Execute(
            IEnumerable<LinkedService> linkedServices,
            IEnumerable<Dataset> datasets,
            Microsoft.Azure.Management.DataFactories.Models.Activity activity,
            IActivityLogger logger)
        {
            DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
            IDictionary<string, string> extendedProperties = dotNetActivity.ExtendedProperties;

            logger.Write("Logging extended properties if any...");
            foreach (KeyValuePair<string, string> entry in extendedProperties)
            {
                logger.Write("<key:{0}> <value:{1}>", entry.Key, entry.Value);
            }

            string[] rowKeyPrefixes = null;
            if (extendedProperties.ContainsKey("rowKeyPrefixes"))
            {
                rowKeyPrefixes = extendedProperties["rowKeyPrefixes"].Split(',');
            }

            bool hasColumnUpdate = false;
            string columnName = string.Empty, columnType = string.Empty, ifColumnValueMatches = string.Empty, replaceColumnValueWith = string.Empty;
            if (extendedProperties.ContainsKey("columnName"))
            {
                columnName = extendedProperties["columnName"];
                columnType = extendedProperties["columnType"];
                ifColumnValueMatches = extendedProperties["ifColumnValueMatches"];
                replaceColumnValueWith = extendedProperties["replaceColumnValueWith"];
                hasColumnUpdate = true;
            }

            bool hasRowKeyUpdate = false;
            string ifRowKeyContains = string.Empty, replaceRowKeySubStrWith = string.Empty;
            if (extendedProperties.ContainsKey("ifRowKeyContains"))
            {
                ifRowKeyContains = extendedProperties["ifRowKeyContains"];
                replaceRowKeySubStrWith = extendedProperties["replaceRowKeySubStrWith"];
                hasRowKeyUpdate = true;
            }

            AzureStorageLinkedService inputLinkedService;
            AzureTableDataset sourceTable;

            // For activities working on a single dataset, the first entry is the input dataset.
            // The activity.Inputs can have multiple datasets for building pipeline workflow dependencies. We can ignore the rest of the datasets
            Dataset inputDataset = datasets.Single(dataset => dataset.Name == activity.Inputs.First().Name);
            sourceTable = inputDataset.Properties.TypeProperties as AzureTableDataset;

            logger.Write("input table:{0}", sourceTable.TableName);

            inputLinkedService = linkedServices.First(
                ls =>
                ls.Name ==
                inputDataset.Properties.LinkedServiceName).Properties.TypeProperties
                as AzureStorageLinkedService;
            string inputConnectionString = inputLinkedService.ConnectionString;

            // create storage client for input. Pass the connection string.
            CloudStorageAccount inputStorageAccount = CloudStorageAccount.Parse(inputConnectionString);
            CloudTableClient inputTableClient = inputStorageAccount.CreateCloudTableClient();
            CloudTable inputTable = inputTableClient.GetTableReference(sourceTable.TableName);

            long totalProcessedRecords = 0;
            long actualAffectedRecords = 0;
            TableContinuationToken tableContinuationToken = null;
            List<Task> tasks = new List<Task>();

            do
            {
                var resultSegment = inputTable.ExecuteQuerySegmented(new TableQuery(), tableContinuationToken);
                tableContinuationToken = resultSegment.ContinuationToken;

                var partitionGroups = (from s in resultSegment.Results
                                       where (rowKeyPrefixes == null || rowKeyPrefixes.Length <= 0) ? true : this.IsMatch(s.RowKey, rowKeyPrefixes)
                                       select s).GroupBy(a => a.PartitionKey);

                foreach (IGrouping<string, DynamicTableEntity> g in partitionGroups)
                {
                    TableBatchOperation batch = new TableBatchOperation();
                    foreach (DynamicTableEntity e in g.AsEnumerable())
                    {
                        string cachedRowkey = e.RowKey;
                        IDictionary<string, EntityProperty> cachedProperties = new Dictionary<string, EntityProperty>();
                        foreach (KeyValuePair<string, EntityProperty> p in e.Properties)
                        {
                            cachedProperties.Add(p);
                        }

                        bool recordUpdated = false, requiresDelete = false;
                        if (hasColumnUpdate)
                        {
                            recordUpdated = this.ReplaceIfMatch(e, columnName, columnType, ifColumnValueMatches, replaceColumnValueWith);
                        }

                        if (hasRowKeyUpdate && e.RowKey.Contains(ifRowKeyContains))
                        {
                            e.RowKey = e.RowKey.Replace(ifRowKeyContains, replaceRowKeySubStrWith);
                            recordUpdated = true;
                            requiresDelete = true;
                        }

                        if (recordUpdated)
                        {
                            if (!requiresDelete)
                            {
                                batch.Replace(e);
                            }
                            else
                            {
                                batch.Insert(e);
                                batch.Delete(new DynamicTableEntity(e.PartitionKey, cachedRowkey, "*", cachedProperties));
                            }

                            actualAffectedRecords++;
                            logger.Write("<partition key:{0}>, <row key:{1}> added to batch", e.PartitionKey, e.RowKey);
                        }
                    }

                    if (batch.Count > 0)
                    {
                        tasks.Add(inputTable.ExecuteBatchInChunkAsync(batch));
                    }

                    logger.Write("Updated partition: {0}", g.Key);
                }

                totalProcessedRecords += resultSegment.Results.Count;
                logger.Write("Processed records count: {0}", totalProcessedRecords);
                logger.Write("Affected records count: {0}", actualAffectedRecords);
            }
            while (tableContinuationToken != null);

            Task.WaitAll(tasks.ToArray());
            logger.Write("Updated {0} records", actualAffectedRecords);

            return new Dictionary<string, string>();
        }