void setBulkChunkSizePerPrefix()

in syncd/FlexCounter.cpp [708:849]


    void setBulkChunkSizePerPrefix(
        _In_ const std::string& bulkChunkSizePerPrefix) override
    {
        SWSS_LOG_ENTER();

        m_bulkChunkSizePerPrefix = bulkChunkSizePerPrefix;

        // No operation if the input string is invalid or no bulk context has been created
        if (!parseBulkChunkSizePerPrefixConfigString(bulkChunkSizePerPrefix) || m_bulkContexts.empty())
        {
            return;
        }

        if (m_bulkContexts.size() == 1)
        {
            // Only one bulk context exists which means
            // it is the first time per counter chunk size is configured and a unified counter ID set is polled for all objects
            auto it = m_bulkContexts.begin();
            std::shared_ptr<BulkContextType> singleBulkContext = it->second;
            const std::vector<StatType> &allCounterIds = singleBulkContext.get()->counter_ids;
            std::map<std::string, vector<StatType>> counterChunkSizePerPrefix;
            std::vector<StatType> defaultPartition;

            if (m_counterChunkSizeMapFromPrefix.empty())
            {
                // There is still no per counter prefix chunk size configured as the chunk size map is still empty.
                singleBulkContext.get()->default_bulk_chunk_size = default_bulk_chunk_size;
            }
            else
            {
                // Split the counter IDs according to the counter ID prefix mapping and store them into m_bulkContexts
                SWSS_LOG_NOTICE("Split counter IDs set by prefix for the first time %s", bulkChunkSizePerPrefix.c_str());
                createCounterBulkChunkSizePerPrefixPartition(allCounterIds, counterChunkSizePerPrefix, defaultPartition, true);

                for (auto &counterPrefix : counterChunkSizePerPrefix)
                {
                    std::sort(counterPrefix.second.begin(), counterPrefix.second.end());
                    auto bulkContext = getBulkStatsContext(counterPrefix.second, counterPrefix.first, m_counterChunkSizeMapFromPrefix[counterPrefix.first]);

                    bulkContext.get()->counter_ids = move(counterPrefix.second);
                    bulkContext.get()->object_statuses.resize(singleBulkContext.get()->object_statuses.size());
                    bulkContext.get()->object_vids = singleBulkContext.get()->object_vids;
                    bulkContext.get()->object_keys = singleBulkContext.get()->object_keys;
                    bulkContext.get()->counters.resize(bulkContext.get()->counter_ids.size() * bulkContext.get()->object_vids.size());

                    SWSS_LOG_INFO("Re-initializing counter partition %s", counterPrefix.first.c_str());
                }

                std::sort(defaultPartition.begin(), defaultPartition.end());
                setBulkStatsContext(defaultPartition, singleBulkContext);
                singleBulkContext.get()->counters.resize(singleBulkContext.get()->counter_ids.size() * singleBulkContext.get()->object_vids.size());
                m_bulkContexts.erase(it);
                SWSS_LOG_INFO("Removed the previous default counter partition");
            }
        }
        else if (m_counterChunkSizeMapFromPrefix.empty())
        {
            // There have been multiple bulk contexts which can result from
            // 1. per counter prefix chunk size configuration
            // 2. different objects support different counter ID set
            // And there is no per counter prefix chunk size configured any more
            // Multiple bulk contexts will be merged into one if they share the same object IDs set, which means case (1).
            std::set<sai_object_id_t> oid_set;
            std::vector<StatType> counter_ids;
            std::shared_ptr<BulkContextType> defaultBulkContext;
            for (auto &context : m_bulkContexts)
            {
                if (oid_set.empty())
                {
                    oid_set.insert(context.second.get()->object_vids.begin(), context.second.get()->object_vids.end());
                }
                else
                {
                    std::set<sai_object_id_t> tmp_oid_set(context.second.get()->object_vids.begin(), context.second.get()->object_vids.end());
                    if (tmp_oid_set != oid_set)
                    {
                        SWSS_LOG_ERROR("Can not merge partition because they contains different objects");
                        return;
                    }
                }
                if (context.second.get()->name == "default")
                {
                    defaultBulkContext = context.second;
                }
                counter_ids.insert(counter_ids.end(), context.second.get()->counter_ids.begin(), context.second.get()->counter_ids.end());
            }

            m_bulkContexts.clear();

            std::sort(counter_ids.begin(), counter_ids.end());
            setBulkStatsContext(counter_ids, defaultBulkContext);
            defaultBulkContext.get()->counters.resize(defaultBulkContext.get()->counter_ids.size() * defaultBulkContext.get()->object_vids.size());
        }
        else
        {
            // Multiple bulk contexts and per counter prefix chunk size
            // Update the chunk size only in this case.
            SWSS_LOG_NOTICE("Update bulk chunk size only %s", bulkChunkSizePerPrefix.c_str());

            auto counterChunkSizeMapFromPrefix = m_counterChunkSizeMapFromPrefix;
            for (auto &bulkStatsContext : m_bulkContexts)
            {
                auto const &name = (*bulkStatsContext.second.get()).name;

                if (name == "default")
                {
                    continue;
                }

                auto const &searchRef = counterChunkSizeMapFromPrefix.find(name);
                if (searchRef != counterChunkSizeMapFromPrefix.end())
                {
                    auto const &chunkSize = searchRef->second;

                    SWSS_LOG_INFO("Reset counter prefix %s chunk size %d", name.c_str(), chunkSize);
                    (*bulkStatsContext.second.get()).default_bulk_chunk_size = chunkSize;
                    counterChunkSizeMapFromPrefix.erase(searchRef);
                }
                else
                {
                    SWSS_LOG_WARN("Update bulk chunk size: bulk chunk size for prefix %s is not provided", name.c_str());
                }
            }

            for (auto &it : counterChunkSizeMapFromPrefix)
            {
                SWSS_LOG_WARN("Update bulk chunk size: prefix %s does not exist", it.first.c_str());
            }
        }

        for (auto &it : m_bulkContexts)
        {
            auto &context = *it.second.get();
            SWSS_LOG_INFO("%s %s partition %s number of OIDs %d number of counter IDs %d number of counters %d",
                          m_name.c_str(),
                          m_instanceId.c_str(),
                          context.name.c_str(),
                          context.object_keys.size(),
                          context.counter_ids.size(),
                          context.counters.size());
        }
    }