in cloudwatch_metrics_common/src/metric_service_factory.cpp [41:93]
std::shared_ptr<MetricService> MetricServiceFactory::createMetricService(
const std::string & metrics_namespace,
const Aws::Client::ClientConfiguration & client_config,
const Aws::SDKOptions & sdk_options,
const CloudWatchOptions & cloudwatch_options)
{
Aws::InitAPI(sdk_options); // per the SDK team this only ever needs to be called once
auto metric_file_manager = std::make_shared<MetricFileManager>(cloudwatch_options.file_manager_strategy_options);
auto metric_publisher = std::make_shared<MetricPublisher>(metrics_namespace, client_config);
auto queue_monitor =
std::make_shared<Aws::DataFlow::QueueMonitor<Aws::FileManagement::TaskPtr<MetricDatumCollection>>>();
Aws::FileManagement::FileUploadStreamerOptions file_upload_options{
cloudwatch_options.uploader_options.file_upload_batch_size,
cloudwatch_options.uploader_options.file_max_queue_size
};
auto metric_file_upload_streamer =
Aws::FileManagement::createFileUploadStreamer<MetricDatumCollection>(metric_file_manager, file_upload_options);
// connect publisher state changes to the File Streamer
metric_publisher->addPublisherStateListener([upload_streamer = metric_file_upload_streamer](const PublisherState& state) {
auto status =
(state == PublisherState::CONNECTED) ? Aws::DataFlow::Status::AVAILABLE : Aws::DataFlow::Status::UNAVAILABLE;
upload_streamer->onPublisherStateChange(status);
});
// Create an observed queue to trigger a publish when data is available
auto file_data_queue =
std::make_shared<TaskObservedBlockingQueue<MetricDatumCollection>>(cloudwatch_options.uploader_options.file_max_queue_size);
auto stream_data_queue = std::make_shared<TaskObservedBlockingQueue<MetricDatumCollection>>(cloudwatch_options.uploader_options.stream_max_queue_size);
metric_file_upload_streamer->setSink(file_data_queue);
queue_monitor->addSource(file_data_queue, DataFlow::PriorityOptions{Aws::DataFlow::LOWEST_PRIORITY});
auto metric_batcher = std::make_shared<MetricBatcher>(
cloudwatch_options.uploader_options.batch_max_queue_size,
cloudwatch_options.uploader_options.batch_trigger_publish_size
);
metric_batcher->setMetricFileManager(metric_file_manager);
metric_batcher->setSink(stream_data_queue);
queue_monitor->addSource(stream_data_queue, Aws::DataFlow::PriorityOptions{Aws::DataFlow::HIGHEST_PRIORITY});
auto metric_service = std::make_shared<MetricService>(metric_publisher, metric_batcher, metric_file_upload_streamer);
metric_service->setSource(queue_monitor);
return metric_service;
}