in cpp/example_code/kinesis/put_get_records.cpp [51:178]
int main(int argc, char** argv)
{
const std::string USAGE = "\n" \
"Usage:\n"
" put_get_records <streamname>\n\n"
"Where:\n"
" streamname - the table to delete the item from.\n\n"
"Example:\n"
" put_get_records sample-stream\n\n";
if (argc != 2)
{
std::cout << USAGE;
return 1;
}
Aws::SDKOptions options;
Aws::InitAPI(options);
{
const Aws::String streamName(argv[1]);
std::random_device rd;
std::mt19937 mt_rand(rd());
Aws::Client::ClientConfiguration clientConfig;
// set your region
clientConfig.region = Aws::Region::US_WEST_2;
Aws::Kinesis::KinesisClient kinesisClient(clientConfig);
Aws::Vector<Aws::String> animals{"dog", "cat", "mouse", "horse", "stoat", "snake"};
Aws::Kinesis::Model::PutRecordsRequest putRecordsRequest;
putRecordsRequest.SetStreamName(streamName);
Aws::Vector<Aws::Kinesis::Model::PutRecordsRequestEntry> putRecordsRequestEntryList;
// create 500 records
std::cout << "Adding records to stream \"" << streamName << "\"" << std::endl;
for (int i = 0; i < 500; i++)
{
Aws::Kinesis::Model::PutRecordsRequestEntry putRecordsRequestEntry;
Aws::StringStream pk;
pk << "pk-" << (i % 100);
putRecordsRequestEntry.SetPartitionKey(pk.str());
Aws::StringStream data;
data << i << ", " << animals[mt_rand() % animals.size()] << ", " << mt_rand() << ", " << mt_rand() * (float).001;
Aws::Utils::ByteBuffer bytes((unsigned char*)data.str().c_str(), data.str().length());
putRecordsRequestEntry.SetData(bytes);
putRecordsRequestEntryList.emplace_back(putRecordsRequestEntry);
}
putRecordsRequest.SetRecords(putRecordsRequestEntryList);
Aws::Kinesis::Model::PutRecordsOutcome putRecordsResult = kinesisClient.PutRecords(putRecordsRequest);
// if one or more records were not put, retry them
while (putRecordsResult.GetResult().GetFailedRecordCount() > 0)
{
std::cout << "Some records failed, retrying" << std::endl;
Aws::Vector<Aws::Kinesis::Model::PutRecordsRequestEntry> failedRecordsList;
Aws::Vector<Aws::Kinesis::Model::PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.GetResult().GetRecords();
for (unsigned int i = 0; i < putRecordsResultEntryList.size(); i++)
{
Aws::Kinesis::Model::PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList[i];
Aws::Kinesis::Model::PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList[i];
if (putRecordsResultEntry.GetErrorCode().length() > 0)
failedRecordsList.emplace_back(putRecordRequestEntry);
}
putRecordsRequestEntryList = failedRecordsList;
putRecordsRequest.SetRecords(putRecordsRequestEntryList);
putRecordsResult = kinesisClient.PutRecords(putRecordsRequest);
}
// Describe shards
Aws::Kinesis::Model::DescribeStreamRequest describeStreamRequest;
describeStreamRequest.SetStreamName(streamName);
Aws::Vector<Aws::Kinesis::Model::Shard> shards;
Aws::String exclusiveStartShardId = "";
do
{
Aws::Kinesis::Model::DescribeStreamOutcome describeStreamResult = kinesisClient.DescribeStream(describeStreamRequest);
Aws::Vector<Aws::Kinesis::Model::Shard> shardsTemp = describeStreamResult.GetResult().GetStreamDescription().GetShards();
shards.insert(shards.end(), shardsTemp.begin(), shardsTemp.end());
std::cout << describeStreamResult.GetError().GetMessage();
if (describeStreamResult.GetResult().GetStreamDescription().GetHasMoreShards() && shards.size() > 0)
{
exclusiveStartShardId = shards[shards.size() - 1].GetShardId();
describeStreamRequest.SetExclusiveStartShardId(exclusiveStartShardId);
}
else
exclusiveStartShardId = "";
} while (exclusiveStartShardId.length() != 0);
if (shards.size() > 0)
{
std::cout << "Shards found:" << std::endl;
for (auto shard : shards)
{
std::cout << shard.GetShardId() << std::endl;
}
Aws::Kinesis::Model::GetShardIteratorRequest getShardIteratorRequest;
getShardIteratorRequest.SetStreamName(streamName);
// use the first shard found
getShardIteratorRequest.SetShardId(shards[0].GetShardId());
getShardIteratorRequest.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON);
Aws::Kinesis::Model::GetShardIteratorOutcome getShardIteratorResult = kinesisClient.GetShardIterator(getShardIteratorRequest);
Aws::String shardIterator = getShardIteratorResult.GetResult().GetShardIterator();
Aws::Kinesis::Model::GetRecordsRequest getRecordsRequest;
getRecordsRequest.SetShardIterator(shardIterator);
getRecordsRequest.SetLimit(25);
// pull down 100 records
std::cout << "Retrieving 100 records" << std::endl;
for (int i = 0; i < 4; i++)
{
Aws::Kinesis::Model::GetRecordsOutcome getRecordsResult = kinesisClient.GetRecords(getRecordsRequest);
for (auto r : getRecordsResult.GetResult().GetRecords())
{
Aws::String s((char*)r.GetData().GetUnderlyingData());
std::cout << s.substr(0, r.GetData().GetLength()) << std::endl;
}
shardIterator = getRecordsResult.GetResult().GetNextShardIterator();
}
}
}
Aws::ShutdownAPI(options);
return 0;
}