in SamplesV1/TwitterAnalysisSample-CustomC#Activity/CustomC#ActivityClass/SentimentAnalysis.cs [136:215]
async Task InvokeBatchExecutionService()
{
// How this works:
//
// 1. Tweets present in Azure Blob
// 2. Call the Batch Execution Service to process the data in the blob.
// 3. The results get written to Azure ML blob.
// 4. Copy the Azure ML output blob to your storage blob.
var blobClient = CloudStorageAccount.Parse(_storageConnectionString).CreateCloudBlobClient();
var container = blobClient.GetContainerReference(_storageContainerName);
var blob = container.GetBlockBlobReference(_inputBlobName);
_logger.Write("Submitting the job...");
// set a time out for polling status
const int timeOutInMilliseconds = 120 * 100000; // Set a timeout
using (HttpClient client = new HttpClient())
{
BatchScoreRequest request = new BatchScoreRequest()
{
Input = new AzureBlobDataReference()
{
ConnectionString = _storageConnectionString,
RelativeLocation = blob.Uri.LocalPath
},
};
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", _apiKey);
var response = await client.PostAsJsonAsync(_baseUrl, request);
string jobId = await response.Content.ReadAsAsync<string>();
string jobLocation = _baseUrl + "/" + jobId;
Stopwatch watch = Stopwatch.StartNew();
bool done = false;
while (!done)
{
response = await client.GetAsync(jobLocation);
BatchScoreStatus status = await response.Content.ReadAsAsync<BatchScoreStatus>();
if (watch.ElapsedMilliseconds > timeOutInMilliseconds)
{
done = true;
_logger.Write("Timed out. Deleting the job ...");
await client.DeleteAsync(jobLocation);
}
switch (status.StatusCode)
{
case BatchScoreStatusCode.NotStarted:
_logger.Write("Not started...");
break;
case BatchScoreStatusCode.Running:
_logger.Write("Running...");
break;
case BatchScoreStatusCode.Failed:
_logger.Write("Failed!");
_logger.Write("Error details : {0}", status.Details);
throw new Exception(status.Details);
case BatchScoreStatusCode.Cancelled:
_logger.Write("Cancelled!");
throw new Exception(status.Details);
case BatchScoreStatusCode.Finished:
done = true;
_logger.Write("Finished!");
var credentials = new StorageCredentials(status.Result.SasBlobToken);
var sourceCloudBlob = new CloudBlockBlob(new Uri(new Uri(status.Result.BaseLocation), status.Result.RelativeLocation), credentials);
var targetCloudBlob = container.GetBlockBlobReference(_outputBlobName);
targetCloudBlob.StartCopy(sourceCloudBlob);
_logger.Write("Copy to Output Blob Complete...");
break;
}
if (!done)
{
Thread.Sleep(1000); // Wait one second
}
}
}
}