SamplesV1/ADFCustomActivityRunner/CustomActivityRunner/CustomActivityBase.cs (145 lines of code) (raw):

using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.ADF.DotNetActivityRunner; using Microsoft.Azure.Management.DataFactories.Models; using Microsoft.Azure.Management.DataFactories.Runtime; namespace CustomActivityRunner { public abstract class CustomActivityBase : IDotNetActivity { public IEnumerable<LinkedService> LinkedServices { get; private set; } public IEnumerable<Dataset> Datasets { get; private set; } public Activity Activity { get; private set; } public IActivityLogger Logger { get; private set; } private DotNetActivity typeProperties; public CustomActivityBase() { if (Debugger.IsAttached) { var attributes = this.GetType().GetMethod("RunActivity").CustomAttributes; var customActivityAttribute = attributes.FirstOrDefault(x => x.AttributeType.Name == "CustomActivityAttribute"); string activityName = customActivityAttribute?.NamedArguments?.FirstOrDefault(x => x.MemberName == "ActivityName").TypedValue.Value?.ToString(); string pipelineLocation = customActivityAttribute?.NamedArguments?.FirstOrDefault(x => x.MemberName == "PipelineLocation").TypedValue.Value?.ToString(); string deployConfig = customActivityAttribute?.NamedArguments?.FirstOrDefault(x => x.MemberName == "DeployConfig").TypedValue.Value?.ToString(); if (!string.IsNullOrEmpty(activityName) || !string.IsNullOrEmpty(pipelineLocation)) { string dataFactoryProjLocation = Path.GetFullPath(Path.Combine(Directory.GetCurrentDirectory(), "..\\..", Path.GetDirectoryName(pipelineLocation))); DotNetActivityContext context = Runner.DeserializeActivity(Path.GetFileName(pipelineLocation), activityName, deployConfig, dataFactoryProjLocation); LinkedServices = context.LinkedServices; Datasets = context.Datasets; Activity = context.Activity; Logger = context.Logger; typeProperties = Activity.TypeProperties as DotNetActivity; } else { throw new Exception($"The CustomActivity attribute needs to have the following properties populated: {nameof(CustomActivityAttribute.PipelineLocation)} and {nameof(CustomActivityAttribute.ActivityName)}"); } } } public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger) { LinkedServices = linkedServices; Datasets = datasets; Activity = activity; Logger = logger; typeProperties = Activity.TypeProperties as DotNetActivity; return RunActivity(); } public abstract IDictionary<string, string> RunActivity(); public string GetExtendedProperty(string name) { if (!typeProperties.ExtendedProperties.ContainsKey(name)) { throw new KeyNotFoundException($"The extended property '{name}' was not found in the extendedProperties section of the activity.'"); } return typeProperties.ExtendedProperties[name]; } public string GetInputSqlConnectionString() { string activityInputName = Activity.Inputs.First().Name; return GetSqlConnectionString(activityInputName); } public string GetOutputSqlConnectionString() { string activityOutputName = Activity.Outputs.First().Name; return GetSqlConnectionString(activityOutputName); } public string GetSqlConnectionString(string datasetName) { Dataset dataset = Datasets.Single(x => x.Name == datasetName); LinkedService linkedService = LinkedServices.First(x => x.Name == dataset.Properties.LinkedServiceName); if (linkedService.Properties.Type != "AzureSqlDatabase") { throw new Exception($"The linked service is of type '{linkedService.Properties.Type}'. It should be of type 'AzureSqlDatabase'."); } AzureSqlDatabaseLinkedService sqlLinkedService = linkedService.Properties.TypeProperties as AzureSqlDatabaseLinkedService; if (sqlLinkedService == null) { throw new Exception($"Unable to find data set name '{datasetName}'."); } string connectionString = sqlLinkedService.ConnectionString; if (string.IsNullOrEmpty(connectionString)) { throw new Exception($"Connection string for '{linkedService.Name}' linked service is empty."); } return connectionString; } public T GetLinkedService<T>(string name) where T : class { int linkedServiceCount = LinkedServices.Count(x => x.Name == name); if (linkedServiceCount == 0) { throw new Exception($"The linked service '{name}' was not found."); } if (linkedServiceCount > 1) { throw new Exception($"More than one linked service with name '{name}' were found. Only one should exist."); } return LinkedServices.First(x => x.Name == name).Properties.TypeProperties as T; } public T GetDataset<T>(string name) where T : class { int datasetCount = Datasets.Count(x => x.Name == name); if (datasetCount == 0) { throw new Exception($"The dataset '{name}' was not found."); } if (datasetCount > 1) { throw new Exception($"More than one datasets with name '{name}' were found. Only one should exist."); } return Datasets.First(x => x.Name == name).Properties.TypeProperties as T; } public string GetBlobFolderPath(string datasetName) { Dataset dataset = Datasets.Single(x => x.Name == datasetName); AzureBlobDataset outputProps = (AzureBlobDataset)dataset.Properties.TypeProperties; return outputProps.FolderPath; } public BlobUtilities GetBlob(string datasetName) { Dataset dataset = Datasets.Single(x => x.Name == datasetName); AzureBlobDataset outputProps = (AzureBlobDataset)dataset.Properties.TypeProperties; AzureStorageLinkedService outputLinkedService = LinkedServices.First(x => x.Name == dataset.Properties.LinkedServiceName).Properties.TypeProperties as AzureStorageLinkedService; string outputConnectionString = outputLinkedService?.ConnectionString; BlobUtilities outputBlob = new BlobUtilities(Logger, outputConnectionString, outputProps.FolderPath); return outputBlob; } public Dictionary<string, string> GetAllExtendedProperties() { DotNetActivity dotNetActivity = (DotNetActivity)Activity.TypeProperties; if (!dotNetActivity.ExtendedProperties.Any()) { throw new Exception($"No properties found in the extended properties section of the custom activity '{Activity.Name}'"); } return dotNetActivity.ExtendedProperties as Dictionary<string, string>; } } }