public IDictionary Execute()

in code/Server/DataFactory/UpdateAppsTable.cs [67:200]


        public IDictionary<string, string> Execute(
            IEnumerable<LinkedService> linkedServices,
            IEnumerable<Dataset> datasets,
            Activity activity,
            IActivityLogger logger)
        {
            DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
            IDictionary<string, string> extendedProperties = dotNetActivity.ExtendedProperties;

            logger.Write("Logging extended properties if any...");
            foreach (KeyValuePair<string, string> entry in extendedProperties)
            {
                logger.Write("<key:{0}> <value:{1}>", entry.Key, entry.Value);
            }

            if (!extendedProperties.ContainsKey("columnName"))
            {
                throw new ArgumentException("Column name is required", "columnName");
            }

            string columnName = extendedProperties["columnName"];

            if (!extendedProperties.ContainsKey("columnType"))
            {
                throw new ArgumentException("Column Type information is required", "columnType");
            }

            string columnType = extendedProperties["columnType"];

            // Note that partitionKeyOwnerValueRule is required as the rules for updating value comes from it
            // We do not update column value with default value if the matching rule is not found. The record is ignored. All rules need to be explicitly specified
            if (!extendedProperties.ContainsKey("partitionKeyOwnerValueRule"))
            {
                throw new ArgumentException("PartitionKeyOwnerValueRule information is required", "partitionKeyOwnerValueRule");
            }

            string partitionKeyOwnerValueRule = extendedProperties["partitionKeyOwnerValueRule"];

            string[] rowKeyPrefixes = null;
            if (extendedProperties.ContainsKey("rowKeyPrefixes"))
            {
                rowKeyPrefixes = extendedProperties["rowKeyPrefixes"].Split(',');
            }

            var partitionKeyOwnerValueRuleDict = partitionKeyOwnerValueRule.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries)
               .Select(part => part.Split('='))
               .ToDictionary(split => split[0], split => split[1]);

            var appHandles = ownerAppHandles.Where(item => partitionKeyOwnerValueRuleDict.ContainsKey(item.Key)).SelectMany(item => item.Value).ToList();

            logger.Write("Matching appHandles:{0}", string.Join(",", appHandles));

            AzureStorageLinkedService inputLinkedService;
            AzureTableDataset sourceTable;

            // For activities working on a single dataset, the first entry is the input dataset.
            // The activity.Inputs can have multiple datasets for building pipeline workflow dependencies. We can ignore the rest of the datasets
            Dataset inputDataset = datasets.Single(dataset => dataset.Name == activity.Inputs.First().Name);
            sourceTable = inputDataset.Properties.TypeProperties as AzureTableDataset;

            logger.Write("input table:{0}", sourceTable.TableName);

            inputLinkedService = linkedServices.First(
                ls =>
                ls.Name ==
                inputDataset.Properties.LinkedServiceName).Properties.TypeProperties
                as AzureStorageLinkedService;
            string inputConnectionString = inputLinkedService.ConnectionString;

            // create storage client for input. Pass the connection string.
            CloudStorageAccount inputStorageAccount = CloudStorageAccount.Parse(inputConnectionString);
            CloudTableClient inputTableClient = inputStorageAccount.CreateCloudTableClient();
            CloudTable inputTable = inputTableClient.GetTableReference(sourceTable.TableName);

            long totalProcessedRecords = 0;
            long actualAffectedRecords = 0;
            TableContinuationToken tableContinuationToken = null;
            List<Task> tasks = new List<Task>();

            do
            {
                var resultSegment = inputTable.ExecuteQuerySegmented(new TableQuery(), tableContinuationToken);
                tableContinuationToken = resultSegment.ContinuationToken;

                var partitionGroups = (from s in resultSegment.Results
                                       where (rowKeyPrefixes == null || rowKeyPrefixes.Length <= 0) ? true : this.IsMatch(s.RowKey, rowKeyPrefixes)
                                       select s).GroupBy(a => a.PartitionKey);

                foreach (IGrouping<string, DynamicTableEntity> g in partitionGroups)
                {
                    TableBatchOperation batch = new TableBatchOperation();
                    foreach (DynamicTableEntity e in g.AsEnumerable())
                    {
                        // If appHandles do not contain the partition key, Continue
                        if (!appHandles.Contains(e.PartitionKey))
                        {
                            continue;
                        }
                        else
                        {
                            // Pick the value to be used for specified AppHandle
                            // This is done by getting the owber key first from e.PartitionKey
                            var ownerKey = ownerAppHandles.FirstOrDefault(x => x.Value.Contains(e.PartitionKey)).Key;

                            // The owner key is used to pick the value for the column
                            string newColumnValue = partitionKeyOwnerValueRuleDict[ownerKey];
                            if (this.ReplaceColumnValue(e, columnName, columnType, newColumnValue))
                            {
                                batch.Merge(e);
                                logger.Write("<partition key:{0}>, <row key:{1}>", e.PartitionKey, e.RowKey);
                            }
                        }
                    }

                    if (batch.Count > 0)
                    {
                        tasks.Add(inputTable.ExecuteBatchInChunkAsync(batch));
                        actualAffectedRecords += batch.Count;
                    }

                    logger.Write("Updated partition: {0}", g.Key);
                }

                totalProcessedRecords += resultSegment.Results.Count;
                logger.Write("Processed records count: {0}", totalProcessedRecords);
                logger.Write("Affected records count: {0}", actualAffectedRecords);
            }
            while (tableContinuationToken != null);

            Task.WaitAll(tasks.ToArray());
            logger.Write("Updated {0} records", actualAffectedRecords);

            return new Dictionary<string, string>();
        }