int main()

in auomscollect.cpp [306:700]


int main(int argc, char**argv) {
    std::string config_file = AUOMSCOLLECT_CONF;
    int stop_delay = 0; // seconds
    bool netlink_mode = false;
    bool debug_mode = false;

    int opt;
    while ((opt = getopt(argc, argv, "c:dns:")) != -1) {
        switch (opt) {
            case 'c':
                config_file = optarg;
                break;
            case 'd':
                debug_mode = true;
                break;
            case 's':
                stop_delay = atoi(optarg);
                break;
            case 'n':
                netlink_mode = true;
                break;
            default:
                usage();
        }
    }

    if (debug_mode) {
        // Enable core dumps
        struct rlimit limits;
        limits.rlim_cur = RLIM_INFINITY;
        limits.rlim_max = RLIM_INFINITY;
        setrlimit(RLIMIT_CORE, &limits);
    }

    auto user_db = std::make_shared<UserDB>();

    try {
        user_db->Start();
    } catch (const std::exception& ex) {
        Logger::Error("Unexpected exception during user_db startup: %s", ex.what());
        exit(1);
    } catch (...) {
        Logger::Error("Unexpected exception during user_db startup");
        exit(1);
    }

    Config config;

    if (!config_file.empty()) {
        try {
            Logger::Info("Opening config file %s", config_file.c_str());
            config.Load(config_file);
        } catch (std::runtime_error& ex) {
            Logger::Error("%s", ex.what());
            exit(1);
        }
    }

    std::string data_dir = AUOMS_DATA_DIR;
    std::string run_dir = AUOMS_RUN_DIR;

    if (config.HasKey("data_dir")) {
        data_dir = config.GetString("data_dir");
    }

    if (config.HasKey("run_dir")) {
        run_dir = config.GetString("run_dir");
    }

    std::string socket_path = run_dir + "/input.socket";

    std::string queue_dir = data_dir + "/collect_queue";

    if (config.HasKey("socket_path")) {
        socket_path = config.GetString("socket_path");
    }

    if (config.HasKey("queue_dir")) {
        queue_dir = config.GetString("queue_dir");
    }

    if (queue_dir.empty()) {
        Logger::Error("Invalid 'queue_file' value");
        exit(1);
    }

    size_t raw_queue_segment_size = 1024*1024;
    size_t num_raw_queue_segments = 10;

    int num_priorities = 8;
    size_t max_file_data_size = 1024*1024;
    size_t max_unsaved_files = 64;
    size_t max_fs_bytes = 128*1024*1024;
    double max_fs_pct = 10;
    double min_fs_free_pct = 5;
    long save_delay = 250;

    if (config.HasKey("raw_queue_segment_size")) {
        raw_queue_segment_size = config.GetUint64("raw_queue_segment_size");
    }

    if (config.HasKey("num_raw_queue_segments")) {
        num_raw_queue_segments = config.GetUint64("num_raw_queue_segments");
    }

    if (config.HasKey("queue_num_priorities")) {
        num_priorities = config.GetUint64("queue_num_priorities");
    }

    if (config.HasKey("queue_max_file_data_size")) {
        max_file_data_size = config.GetUint64("queue_max_file_data_size");
    }

    if (config.HasKey("queue_max_unsaved_files")) {
        max_unsaved_files = config.GetUint64("queue_max_unsaved_files");
    }

    if (config.HasKey("queue_max_fs_bytes")) {
        max_fs_bytes = config.GetUint64("queue_max_fs_bytes");
    }

    if (config.HasKey("queue_max_fs_pct")) {
        max_fs_pct = config.GetDouble("queue_max_fs_pct");
    }

    if (config.HasKey("queue_min_fs_free_pct")) {
        min_fs_free_pct = config.GetDouble("queue_min_fs_free_pct");
    }

    if (config.HasKey("queue_save_delay")) {
        save_delay = config.GetUint64("queue_save_delay");
    }

    std::string lock_file = data_dir + "/auomscollect.lock";

    if (config.HasKey("lock_file")) {
        lock_file = config.GetString("lock_file");
    }

    uint64_t rss_limit = 256L*1024L*1024L;
    uint64_t virt_limit = 1024L*1024L*1024L;
    double rss_pct_limit = 2;

    if (config.HasKey("rss_limit")) {
        rss_limit = config.GetUint64("rss_limit");
    }

    if (config.HasKey("rss_pct_limit")) {
        rss_pct_limit = config.GetDouble("rss_pct_limit");
    }

    if (config.HasKey("virt_limit")) {
        virt_limit = config.GetUint64("virt_limit");
    }

    int cpu_nice = -20;
    if (config.HasKey("cpu_nice")) {
        cpu_nice = config.GetInt64("cpu_nice");
    }

    bool use_syslog = true;
    if (config.HasKey("use_syslog")) {
        use_syslog = config.GetBool("use_syslog");
    }

    if (use_syslog) {
        Logger::OpenSyslog("auomscollect", LOG_DAEMON);
    }

    bool disable_cgroups = false;
    if (config.HasKey("disable_cgroups")) {
        disable_cgroups = config.GetBool("disable_cgroups");
    }

    // Set cgroup defaults
    if (!config.HasKey(CPU_SOFT_LIMIT_NAME)) {
        config.SetString(CPU_SOFT_LIMIT_NAME, "3");
    }

    if (!config.HasKey(CPU_HARD_LIMIT_NAME)) {
        config.SetString(CPU_HARD_LIMIT_NAME, "20");
    }

    // Set EventPrioritizer defaults
    if (!config.HasKey("event_priority_by_syscall")) {
        config.SetString("event_priority_by_syscall", R"json({"execve":2,"execveat":2,"*":3})json");
    }

    if (!config.HasKey("event_priority_by_record_type_category")) {
        config.SetString("event_priority_by_record_type_category", R"json({"AUOMS_MSG":0, "USER_MSG":1,"SELINUX":1,"APPARMOR":1})json");
    }

    int default_priority = 4;
    if (config.HasKey("default_event_priority")) {
        default_priority = static_cast<uint16_t>(config.GetUint64("default_event_priority"));
    }
    if (default_priority > num_priorities-1) {
        default_priority = num_priorities-1;
    }

    auto event_prioritizer = std::make_shared<EventPrioritizer>(default_priority);
    if (!event_prioritizer->LoadFromConfig(config)) {
        Logger::Error("Failed to load EventPrioritizer config, exiting");
        exit(1);
    }

    Logger::Info("Trying to acquire singleton lock");
    LockFile singleton_lock(lock_file);
    switch(singleton_lock.Lock()) {
        case LockFile::FAILED:
            Logger::Error("Failed to acquire singleton lock (%s): %s", lock_file.c_str(), std::strerror(errno));
            exit(1);
            break;
        case LockFile::PREVIOUSLY_ABANDONED:
            Logger::Warn("Previous instance did not exit cleanly");
            break;
        case LockFile::INTERRUPTED:
            Logger::Error("Failed to acquire singleton lock (%s): Interrupted", lock_file.c_str());
            exit(1);
            break;
    }
    Logger::Info("Acquire singleton lock");

    std::atomic_long ingest_thread_id(0);
    std::shared_ptr<CGroupCPU> cgcpu_root;
    std::shared_ptr<CGroupCPU> cgcpu;
    if (!disable_cgroups) {
        try {
            cgcpu_root = CGroups::OpenCPU("");
            cgcpu = CPULimits::CGFromConfig(config, "auomscollect");
            // systemd may not have put auomscollect into the default cgroup at this point
            // Wait a few seconds before moving into the right cgroup so we avoid getting moved back out by systemd
            std::thread cg_thread([&cgcpu_root,&cgcpu,&ingest_thread_id]() {
                Signals::InitThread();
                int sleep_time = 10;
                // Loop forever to make sure we stay in our cgroup
                while (!Signals::IsExit()) {
                    sleep(sleep_time);
                    sleep_time = 60;
                    try {
                        cgcpu->AddSelf();
                    } catch (const std::exception &ex) {
                        Logger::Error("Failed to configure cpu cgroup: %s", ex.what());
                        Logger::Warn("CPU Limits cannot be enforced");
                        return;
                    }
                    long tid = ingest_thread_id.load();
                    if (tid != 0) {
                        try {
                            cgcpu_root->AddThread(tid);
                        } catch (std::runtime_error &ex) {
                            Logger::Error("Failed to move ingest thread to root cgroup: %s", ex.what());
                            // Set the id back to 0 so we don't keep trying.
                            ingest_thread_id.store(0);
                        }
                    }
                }
            });
            cg_thread.detach();
        } catch (std::runtime_error &ex) {
            Logger::Error("Failed to configure cpu cgroup: %s", ex.what());
            Logger::Warn("CPU Limits cannot be enforced");
        }
    }

    if (!SetProcNice(cpu_nice)) {
        Logger::Warn("Failed to set CPU nice value to %d: %s", cpu_nice, std::strerror(errno));
    }

    // This will block signals like SIGINT and SIGTERM
    // They will be handled once Signals::Start() is called.
    Signals::Init();

    SPSCDataQueue raw_queue(raw_queue_segment_size, num_raw_queue_segments);

    Logger::Info("Opening queue: %s", queue_dir.c_str());
    auto queue = PriorityQueue::Open(queue_dir, num_priorities, max_file_data_size, max_unsaved_files, max_fs_bytes, max_fs_pct, min_fs_free_pct);
    if (!queue) {
        Logger::Error("Failed to open queue '%s'", queue_dir.c_str());
        exit(1);
    }

    auto event_queue = std::make_shared<EventQueue>(queue);
    auto builder = std::make_shared<EventBuilder>(event_queue, event_prioritizer);

    auto metrics = std::make_shared<Metrics>("auomscollect", queue);
    metrics->Start();

    auto proc_metrics = std::make_shared<ProcMetrics>("auomscollect", queue, metrics, rss_limit, virt_limit, rss_pct_limit, []() {
        Logger::Error("A memory limit was exceeded, exiting immediately");
        exit(1);
    });
    proc_metrics->Start();

    RawEventAccumulator accumulator (builder, metrics);

    auto output_config = std::make_unique<Config>(std::unordered_map<std::string, std::string>({
        {"output_format","raw"},
        {"output_socket", socket_path},
        {"enable_ack_mode", "true"},
        {"ack_queue_size", "100"}
    }));
    auto writer_factory = std::shared_ptr<IEventWriterFactory>(static_cast<IEventWriterFactory*>(new RawOnlyEventWriterFactory()));
    Output output("output", queue, writer_factory, nullptr);
    output.Load(output_config);

    std::thread autosave_thread([&]() {
        Signals::InitThread();
        try {
            queue->Saver(save_delay);
        } catch (const std::exception& ex) {
            Logger::Error("Unexpected exception in autosave thread: %s", ex.what());
            exit(1);
        }
    });

    auto ingest_bytes_metric = metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "ingest", "bytes", MetricPeriod::SECOND, MetricPeriod::HOUR);
    auto ingest_records_metric = metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "ingest", "records", MetricPeriod::SECOND, MetricPeriod::HOUR);
    auto lost_bytes_metric = metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "ingest", "lost_bytes", MetricPeriod::SECOND, MetricPeriod::HOUR);
    auto lost_segments_metric = metrics->AddMetric(MetricType::METRIC_BY_ACCUMULATION, "ingest", "lost_segments", MetricPeriod::SECOND, MetricPeriod::HOUR);

    std::thread proc_thread([&]() {
        std::unique_ptr<RawEventRecord> record = std::make_unique<RawEventRecord>();
        uint8_t* ptr;
        ssize_t size;

        while((size = raw_queue.Get(&ptr)) > 0) {
            auto data_ptr = reinterpret_cast<char*>(ptr)+sizeof(RecordType);
            auto data_size = size-sizeof(RecordType);
            if (data_size <= RawEventRecord::MAX_RECORD_SIZE) {
                memcpy(record->Data(), data_ptr, data_size);
                if (record->Parse(*reinterpret_cast<RecordType*>(ptr), data_size)) {
                    accumulator.AddRecord(std::move(record));
                    record = std::make_unique<RawEventRecord>();
                } else {
                    Logger::Warn("Received unparsable event data: '%s'", std::string(record->Data(), size).c_str());
                }
            } else {
                Logger::Warn("Received event data size (%ld) exceeded size limit (%ld)", data_size, RawEventRecord::MAX_RECORD_SIZE);
            }
            raw_queue.Release();
        }
    });

    // Start signal handling thread
    Signals::Start();
    output.Start();

    // The ingest tasks needs to run outside cgroup limits
    std::thread ingest_thread([&]() {
        Signals::InitThread();
        auto thread_id = CGroups::GetSelfThreadId();
        Logger::Info("Starting ingest thead (%ld)", thread_id);
        ingest_thread_id.store(thread_id);
        if (netlink_mode) {
            bool restart;
            do {
                restart = DoNetlinkCollection(raw_queue, ingest_bytes_metric, ingest_records_metric, lost_bytes_metric,
                                              lost_segments_metric);
            } while (restart);
        } else {
            DoStdinCollection(raw_queue, ingest_bytes_metric, ingest_records_metric, lost_bytes_metric,
                              lost_segments_metric);
        }
    });
    ingest_thread.join();

    Logger::Info("Exiting");

    try {
        raw_queue.Close();
        proc_thread.join();
        proc_metrics->Stop();
        metrics->Stop();
        accumulator.Flush(0);
        if (stop_delay > 0) {
            Logger::Info("Waiting %d seconds for output to flush", stop_delay);
            sleep(stop_delay);
        }
        output.Stop();
        metrics->FlushLogMetrics();
        queue->Close(); // Close queue, this will trigger exit of autosave thread
        autosave_thread.join(); // Wait for autosave thread to exit
    } catch (const std::exception& ex) {
        Logger::Error("Unexpected exception during exit: %s", ex.what());
        exit(1);
    } catch (...) {
        Logger::Error("Unexpected exception during exit");
        exit(1);
    }

    singleton_lock.Unlock();

    exit(0);
}