SamplesV1/HttpDataDownloaderSample/CustomDotNetActivity/DataDownloader.cs (119 lines of code) (raw):
namespace DataDownloaderActivityNS
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Reflection;
using System.Threading;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.Azure.Management.DataFactories.Runtime;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
public class DataDownloaderActivity : IDotNetActivity
{
private IActivityLogger _logger;
private string _dataStorageAccountName;
private string _dataStorageAccountKey;
private string _dataStorageContainer;
//public IDictionary<string, string> Execute(
// IEnumerable<ResolvedTable> inputTables,
// IEnumerable<ResolvedTable> outputTables,
// IDictionary<string, string> inputs,
// IActivityLogger activityLogger)
public IDictionary<string, string> Execute(
IEnumerable<LinkedService> linkedServices,
IEnumerable<Dataset> datasets,
Activity activity,
IActivityLogger logger)
{
// to get extended properties (for example: SliceStart)
DotNetActivity dotNetActivity = (DotNetActivity)activity.TypeProperties;
_dataStorageAccountName = dotNetActivity.ExtendedProperties["dataStorageAccountName"];
_dataStorageAccountKey = dotNetActivity.ExtendedProperties["dataStorageAccountKey"];
_dataStorageContainer = dotNetActivity.ExtendedProperties["dataStorageContainer"];
string sliceStartTime = dotNetActivity.ExtendedProperties["sliceStart"];
string urlFormat = dotNetActivity.ExtendedProperties["urlFormat"];
_logger = logger;
GatherDataForOneHour(sliceStartTime, urlFormat);
_logger.Write("Exit");
return new Dictionary<string, string>();
}
/// <summary>
/// Gather data for each Hour based on Slice Start Time.
/// </summary>
/// <param name="sliceStartTime"></param>
/// <param name="urlFormat"></param>
private void GatherDataForOneHour(string sliceStartTime, string urlFormat)
{
Uri storageAccountUri = new Uri("http://" + _dataStorageAccountName + ".blob.core.windows.net/");
string year = sliceStartTime.Substring(0, 4);
string month = sliceStartTime.Substring(4, 2);
string day = sliceStartTime.Substring(6, 2);
string hour = sliceStartTime.Substring(8, 2);
string minute = sliceStartTime.Substring(10, 2);
DateTime dataSlotGathered = new DateTime(int.Parse(year), int.Parse(month), int.Parse(day), int.Parse(hour), int.Parse(minute), 0);
_logger.Write("Current data slot gathered : {0}.......", dataSlotGathered);
// Temporary staging folder
string dataStagingFolder = string.Format(@"{0}\{1}\{1}-{2}", Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), year, month);
Directory.CreateDirectory(dataStagingFolder);
// Temporary staging file
string hourlyFileName = string.Format("data-{0}{1}{2}-{3}0000.txt", year, month, day, hour);
string decompressedFile = Path.Combine(dataStagingFolder, hourlyFileName);
try
{
_logger.Write("Gathering hourly data: ..");
TriggerRequest(urlFormat, year, month, day, hour, decompressedFile);
_logger.Write("Uploading to Blob: ..");
CloudBlobClient blobClient = new CloudBlobClient(storageAccountUri, new StorageCredentials(_dataStorageAccountName, _dataStorageAccountKey));
string blobPath = string.Format(CultureInfo.InvariantCulture, "httpdownloaddatain/{0}-{1}-{2}-{3}/{4}",
year, month, day, hour, hourlyFileName);
CloudBlobContainer container = blobClient.GetContainerReference(_dataStorageContainer);
container.CreateIfNotExists();
var blob = container.GetBlockBlobReference(blobPath);
blob.UploadFromFile(decompressedFile, FileMode.OpenOrCreate);
}
catch (Exception ex)
{
_logger.Write("Error occurred : {0}", ex);
throw;
}
finally
{
if (File.Exists(decompressedFile))
{
File.Delete(decompressedFile);
}
}
}
/// <summary>
/// Trigger request to the HTTP Endpoint
/// </summary>
/// <param name="urlFormat"></param>
/// <param name="year"></param>
/// <param name="month"></param>
/// <param name="day"></param>
/// <param name="hour"></param>
/// <param name="decompressedFile"></param>
private void TriggerRequest(string urlFormat, string year, string month, string day, string hour, string decompressedFile)
{
int retries = 0;
bool found = false;
while (retries <= 10 && !found)
{
string url = string.Format(urlFormat, year, month, day, hour, retries.ToString("00"));
try
{
_logger.Write("Making request to url : {0}..", url);
HttpWebRequest request = (HttpWebRequest)WebRequest.Create(url);
using (HttpWebResponse response = (HttpWebResponse)request.GetResponse())
{
using (StreamReader reader = new StreamReader(response.GetResponseStream()))
{
_logger.Write("Decompressing to a file: ..");
using (FileStream decompressedFileStream = File.Create(decompressedFile))
{
using (GZipStream decompressionStream = new GZipStream(reader.BaseStream, CompressionMode.Decompress))
{
decompressionStream.CopyTo(decompressedFileStream);
_logger.Write("Decompression complete to : {0}", decompressedFile);
}
}
}
}
found = true;
}
catch (Exception e)
{
_logger.Write("Unable to download : {0} with error: {1}.", url, e.Message);
if (retries == 10)
{
throw;
}
}
retries++;
Thread.Sleep(1000);
}
}
}
}