in SampleProducer/SampleProducer.cs [49:110]
public static void Main(string[] args)
{
const string myStreamName = "kclnetsample";
const int myStreamSize = 1;
try
{
var createStreamRequest = new CreateStreamRequest();
createStreamRequest.StreamName = myStreamName;
createStreamRequest.ShardCount = myStreamSize;
var createStreamReq = createStreamRequest;
var CreateStreamResponse = kinesisClient.CreateStreamAsync(createStreamReq).Result;
Console.Error.WriteLine("Created Stream : " + myStreamName);
}
catch (AggregateException ae)
{
ae.Handle((x) =>
{
if (x is ResourceInUseException)
{
Console.Error.WriteLine("Producer is not creating stream " + myStreamName +
" to put records into as a stream of the same name already exists.");
return true;
}
return false; // Let anything else stop the application.
});
}
WaitForStreamToBecomeAvailable(myStreamName);
Console.Error.WriteLine("Putting records in stream : " + myStreamName);
// Write 10 UTF-8 encoded records to the stream.
for (int j = 0; j < 10; ++j)
{
PutRecordRequest requestRecord = new PutRecordRequest();
requestRecord.StreamName = myStreamName;
requestRecord.Data = new MemoryStream(Encoding.UTF8.GetBytes("testData-" + j));
requestRecord.PartitionKey = "partitionKey-" + j;
var putResultResponse = kinesisClient.PutRecordAsync(requestRecord).Result;
Console.Error.WriteLine(
String.Format("Successfully putrecord {0}:\n\t partition key = {1,15}, shard ID = {2}",
j, requestRecord.PartitionKey, putResultResponse.ShardId));
}
// Uncomment the following if you wish to delete the stream here.
//Console.Error.WriteLine("Deleting stream : " + myStreamName);
//DeleteStreamRequest deleteStreamReq = new DeleteStreamRequest();
//deleteStreamReq.StreamName = myStreamName;
//try
//{
// kinesisClient.DeleteStream(deleteStreamReq);
// Console.Error.WriteLine("Stream is now being deleted : " + myStreamName);
//}
//catch (ResourceNotFoundException ex)
//
// Console.Error.WriteLine("Stream could not be found; " + ex);
//}
//catch (AmazonClientException ex)
//{
// Console.Error.WriteLine("Error deleting stream; " + ex);
//}
}