std::shared_ptr MetricServiceFactory::createMetricService()

in cloudwatch_metrics_common/src/metric_service_factory.cpp [41:93]


std::shared_ptr<MetricService> MetricServiceFactory::createMetricService(
        const std::string & metrics_namespace,
        const Aws::Client::ClientConfiguration & client_config,
        const Aws::SDKOptions & sdk_options,
        const CloudWatchOptions & cloudwatch_options)
{
  Aws::InitAPI(sdk_options); // per the SDK team this only ever needs to be called once

  auto metric_file_manager = std::make_shared<MetricFileManager>(cloudwatch_options.file_manager_strategy_options);

  auto metric_publisher = std::make_shared<MetricPublisher>(metrics_namespace, client_config);

  auto queue_monitor =
          std::make_shared<Aws::DataFlow::QueueMonitor<Aws::FileManagement::TaskPtr<MetricDatumCollection>>>();

  Aws::FileManagement::FileUploadStreamerOptions file_upload_options{
    cloudwatch_options.uploader_options.file_upload_batch_size,
    cloudwatch_options.uploader_options.file_max_queue_size
  };

  auto metric_file_upload_streamer =
          Aws::FileManagement::createFileUploadStreamer<MetricDatumCollection>(metric_file_manager, file_upload_options);

  // connect publisher state changes to the File Streamer
  metric_publisher->addPublisherStateListener([upload_streamer = metric_file_upload_streamer](const PublisherState& state) {
      auto status =
              (state == PublisherState::CONNECTED) ? Aws::DataFlow::Status::AVAILABLE : Aws::DataFlow::Status::UNAVAILABLE;
      upload_streamer->onPublisherStateChange(status);
  });

  // Create an observed queue to trigger a publish when data is available
  auto file_data_queue =
          std::make_shared<TaskObservedBlockingQueue<MetricDatumCollection>>(cloudwatch_options.uploader_options.file_max_queue_size);

  auto stream_data_queue = std::make_shared<TaskObservedBlockingQueue<MetricDatumCollection>>(cloudwatch_options.uploader_options.stream_max_queue_size);

  metric_file_upload_streamer->setSink(file_data_queue);
  queue_monitor->addSource(file_data_queue, DataFlow::PriorityOptions{Aws::DataFlow::LOWEST_PRIORITY});

  auto metric_batcher = std::make_shared<MetricBatcher>(
          cloudwatch_options.uploader_options.batch_max_queue_size,
          cloudwatch_options.uploader_options.batch_trigger_publish_size
  );
  metric_batcher->setMetricFileManager(metric_file_manager);

  metric_batcher->setSink(stream_data_queue);
  queue_monitor->addSource(stream_data_queue, Aws::DataFlow::PriorityOptions{Aws::DataFlow::HIGHEST_PRIORITY});

  auto metric_service = std::make_shared<MetricService>(metric_publisher, metric_batcher, metric_file_upload_streamer);
  metric_service->setSource(queue_monitor);

  return metric_service;
}