SamplesV1/RunRScriptUsingADFSample/InvokeRScript.cs (218 lines of code) (raw):

using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Reflection; using Microsoft.WindowsAzure.Storage; namespace ADF.Sample.ExecuteRScriptWithCustomActivity { public class RExecutionActivity : IDotNetActivity { /// <summary> /// Override Execute method in ADF .Net Activity /// </summary> /// <param name="linkedServices"></param> /// <param name="datasets"></param> /// <param name="activity"></param> /// <param name="logger"></param> /// <returns></returns> public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger) { logger.Write("Executing R with ADF .Net Activity"); AppDomain.CurrentDomain.AssemblyResolve += CurrentDomain_AssemblyResolve; IDictionary<string, string> extendedProperties = ((DotNetActivity)activity.TypeProperties).ExtendedProperties; //These file paths are specific to the R script. We pass the file paths to our R script as parameters via the pipeline json string experimentType; extendedProperties.TryGetValue("modelType", out experimentType); string snapShotFile; extendedProperties.TryGetValue("snapShotFile", out snapShotFile); string timeSeriesFile; extendedProperties.TryGetValue("timeSeriesFile", out timeSeriesFile); string blobPath; extendedProperties.TryGetValue("blobPath", out blobPath); string churnTagFile; extendedProperties.TryGetValue("churnTagFile", out churnTagFile); string outputFile; extendedProperties.TryGetValue("outputFile", out outputFile); logger.Write("Starting Batch Execution Service"); InvokeR(experimentType, snapShotFile, timeSeriesFile, churnTagFile, blobPath, outputFile, logger); return new Dictionary<string, string>(); } /// <summary> /// Resolve local path on HDInsight VM where the ADF .Net activity is running /// </summary> /// <param name="sender"></param> /// <param name="args"></param> /// <returns></returns> public Assembly CurrentDomain_AssemblyResolve(object sender, ResolveEventArgs args) { var assemblyname = new AssemblyName(args.Name).Name; if (assemblyname.Contains("Microsoft.WindowsAzure.Storage")) { assemblyname = "Microsoft.WindowsAzure.Storage"; var assemblyFileName = Path.Combine(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), assemblyname + ".dll"); var assembly = Assembly.LoadFrom(assemblyFileName); return assembly; } return null; } /// <summary> /// Invoke RScript.exe and run the R script /// </summary> /// <param name="experimentType"></param> /// <param name="snapShotFile"></param> /// <param name="timeSeriesFile"></param> /// <param name="churnTagFile"></param> /// <param name="blobPath"></param> /// <param name="outputFile"></param> /// <param name="logger"></param> public static void InvokeR(string experimentType, string snapShotFile, string timeSeriesFile, string churnTagFile, string blobPath, string outputFile, IActivityLogger logger) { const string accountName = "<Provide Storage Account Name>"; const string accountKey = "<Provide Storage Account Key>"; var connectionString = String.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", accountName, accountKey); var process = new Process(); try { string pathToRExecutable; string[] blobNames; const string containerName = "hdiclustertest"; //gen_ts_features.r is one of the files the R script requires. train.r and score.r are the R scripts themselves. var sourceFile = "gen_ts_features.r"; if (experimentType.ToUpper().Equals("TRAINING")) { pathToRExecutable = "train.r"; blobNames = new[] { sourceFile, pathToRExecutable, snapShotFile, timeSeriesFile, churnTagFile }; } else { pathToRExecutable = "score.r"; blobNames = new[] { sourceFile, pathToRExecutable, snapShotFile, timeSeriesFile }; } var resultBlobPath = String.Format("{0}/{1}", containerName, blobPath); logger.Write("Creating working directory"); logger.Write(String.Format("Machine Name: {0}", Environment.MachineName)); var workingDirectory = new FileInfo(typeof(RExecutionActivity).Assembly.Location).DirectoryName; logger.Write(String.Format("Directory Name : {0}", workingDirectory)); //var workingDirectory = CreateWorkingDirectory(); logger.Write(String.Format("Working directory created: {0}", workingDirectory)); DirectoryCopy(@"C:\apps\dist\R", workingDirectory, true); logger.Write("Downloading input files used by this sample to the Working Directory"); var inputFileNames = DownloadInputFiles(workingDirectory, connectionString, resultBlobPath, blobNames); var index = 0; for (; index < inputFileNames.Length; index++) { var file = inputFileNames[index]; if (File.Exists(file)) { logger.Write(String.Format("File : {0} exists", file)); } } logger.Write("Input Files Download completed"); sourceFile = inputFileNames[0]; pathToRExecutable = inputFileNames[1]; var ssFile = inputFileNames[2]; var tsFile = inputFileNames[3]; string args; var outputFileName = String.Format("{0}\\{1}", workingDirectory, outputFile); logger.Write(String.Format("Output file name : {0}", outputFileName)); if (experimentType.ToUpper().Equals("TRAINING")) { var tagFile = inputFileNames[4]; args = String.Format("{0} {1} {2} {3} {4}", sourceFile, tsFile, ssFile, tagFile, outputFileName); logger.Write(String.Format("Arguments in training are : {0}", args)); } else { args = String.Format("{0} {1} {2} {3}", sourceFile, tsFile, ssFile, outputFileName); } /////R execution///// ProcessStartInfo startInfo = new ProcessStartInfo { WindowStyle = ProcessWindowStyle.Hidden, UseShellExecute = false, RedirectStandardError = true, RedirectStandardOutput = true, CreateNoWindow = true }; logger.Write(File.Exists(String.Format("{0}{1}", workingDirectory, @"\R-3.2.2\bin\x64\Rscript.exe")) ? "R File exists" : "R file does not exist"); startInfo.FileName = String.Format("{0}{1}", workingDirectory, @"\R-3.2.2\bin\x64\Rscript.exe"); startInfo.Arguments = String.Format("{0} {1}", pathToRExecutable, args); if (workingDirectory != null) startInfo.WorkingDirectory = workingDirectory; logger.Write("R Execution started"); process.StartInfo = startInfo; process.Start(); logger.Write(String.Format("Process started with process id : {0} on machine : {1}", process.Id, process.MachineName)); var errorReader = process.StandardError; var outputReader = process.StandardOutput; while (!outputReader.EndOfStream) { var text = outputReader.ReadLine(); logger.Write(text); } logger.Write("output reader complete"); while (!errorReader.EndOfStream) { errorReader.ReadLine(); } logger.Write(String.Format("Standard Output : {0}", process.StandardOutput.ReadToEnd())); logger.Write(String.Format("Standard Error: {0}", process.StandardError.ReadToEnd())); logger.Write("output reader end of stream complete"); process.WaitForExit(); while (!process.HasExited) { logger.Write("R is still running"); } logger.Write(String.Format("Process start time : {0}, end time : {1}", process.StartTime, process.ExitTime)); /////Upload file///// if (File.Exists(outputFileName)) { logger.Write("Uploading file started"); UploadFile(connectionString, resultBlobPath, outputFileName, outputFile); } else { logger.Write("output file not found"); } } catch (Exception ex) { logger.Write(String.Format("Exception is : {0}", ex.Message)); } } /// <summary> /// Upload file to Azure Blob /// </summary> /// <param name="connectionString"></param> /// <param name="containerName"></param> /// <param name="filePath"></param> /// <param name="fileName"></param> private static void UploadFile(string connectionString, string containerName, string filePath, string fileName) { var storageAccount = CloudStorageAccount.Parse(connectionString); var blobClient = storageAccount.CreateCloudBlobClient(); var container = blobClient.GetContainerReference(containerName); var blob = container.GetBlockBlobReference(fileName); blob.UploadFromFile(filePath, FileMode.Open); Console.WriteLine("File upload completed"); } /// <summary> /// Download input files from Azure blob /// </summary> /// <param name="workingDirectory"></param> /// <param name="connectionString"></param> /// <param name="containerName"></param> /// <param name="blobNames"></param> /// <returns></returns> private static string[] DownloadInputFiles(string workingDirectory, string connectionString, string containerName, string[] blobNames) { var inputStorageAccount = CloudStorageAccount.Parse(connectionString); var inputClient = inputStorageAccount.CreateCloudBlobClient(); var container = inputClient.GetContainerReference(containerName); var inputFiles = new string[blobNames.Length]; for (var blobCnt = 0; blobCnt < blobNames.Length; blobCnt++) { var blobName = blobNames[blobCnt]; var blockBlob = container.GetBlockBlobReference(blobName); using (var fileStream = File.OpenWrite(Path.Combine(workingDirectory, blockBlob.Name))) { blockBlob.DownloadToStream(fileStream); inputFiles[blobCnt] = Path.Combine(workingDirectory, blockBlob.Name); } } return inputFiles; } /// <summary> /// Copy files from source to destination directory recursively /// </summary> /// <param name="sourceDirName"></param> /// <param name="destDirName"></param> /// <param name="copySubDirs"></param> private static void DirectoryCopy(string sourceDirName, string destDirName, bool copySubDirs) { // Get the subdirectories for the specified directory. var dir = new DirectoryInfo(sourceDirName); if (!dir.Exists) { throw new DirectoryNotFoundException( "Source directory does not exist or could not be found: " + sourceDirName); } var dirs = dir.GetDirectories(); // If the destination directory doesn't exist, create it. if (!Directory.Exists(destDirName)) { Directory.CreateDirectory(destDirName); } // Get the files in the directory and copy them to the new location. var files = dir.GetFiles(); foreach (var file in files) { var temppath = Path.Combine(destDirName, file.Name); file.CopyTo(temppath, false); } // If copying subdirectories, copy them and their contents to new location. if (!copySubDirs) return; foreach (var subdir in dirs) { var temppath = Path.Combine(destDirName, subdir.Name); DirectoryCopy(subdir.FullName, temppath, true); } } } }