std::shared_ptr LogServiceFactory::CreateLogService()

in cloudwatch_logs_common/src/log_service_factory.cpp [42:95]


std::shared_ptr<LogService> LogServiceFactory::CreateLogService(
  const std::string & log_group,
  const std::string & log_stream,
  const Aws::Client::ClientConfiguration & client_config,
  const Aws::SDKOptions & sdk_options,
  const CloudWatchOptions & cloudwatch_options)
{
  Aws::InitAPI(sdk_options); // per the SDK team this only ever needs to be called once

  auto log_file_manager = std::make_shared<LogFileManager>(cloudwatch_options.file_manager_strategy_options);

  auto publisher = std::make_shared<LogPublisher>(log_group, log_stream, client_config);

  auto queue_monitor =
      std::make_shared<Aws::DataFlow::QueueMonitor<Aws::FileManagement::TaskPtr<LogCollection>>>();

  Aws::FileManagement::FileUploadStreamerOptions file_upload_options{
    cloudwatch_options.uploader_options.file_upload_batch_size,
    cloudwatch_options.uploader_options.file_max_queue_size
  };

  auto log_file_upload_streamer =
      Aws::FileManagement::createFileUploadStreamer<LogCollection>(log_file_manager, file_upload_options);

  // connect publisher state changes to the File Streamer
  publisher->addPublisherStateListener([upload_streamer = log_file_upload_streamer](const PublisherState& state) {
    auto status =
      (state == PublisherState::CONNECTED) ? Aws::DataFlow::Status::AVAILABLE : Aws::DataFlow::Status::UNAVAILABLE;
    upload_streamer->onPublisherStateChange(status);
  });

  // Create an observed queue to trigger a publish when data is available
  auto file_data_queue =
      std::make_shared<TaskObservedBlockingQueue<LogCollection>>(cloudwatch_options.uploader_options.file_max_queue_size);

  auto stream_data_queue = std::make_shared<TaskObservedBlockingQueue<LogCollection>>(cloudwatch_options.uploader_options.stream_max_queue_size);

  auto log_batcher = std::make_shared<LogBatcher>(
    cloudwatch_options.uploader_options.batch_max_queue_size,
    cloudwatch_options.uploader_options.batch_trigger_publish_size
  );
  log_batcher->setLogFileManager(log_file_manager);

  log_file_upload_streamer->setSink(file_data_queue);
  queue_monitor->addSource(file_data_queue, DataFlow::PriorityOptions{Aws::DataFlow::LOWEST_PRIORITY});

  log_batcher->setSink(stream_data_queue);
  queue_monitor->addSource(stream_data_queue, DataFlow::PriorityOptions{Aws::DataFlow::HIGHEST_PRIORITY});

  auto log_service = std::make_shared<LogService>(publisher, log_batcher, log_file_upload_streamer);
  log_service->setSource(queue_monitor);

  return log_service;  // allow user to start
}