in syncd/FlexCounter.cpp [708:849]
void setBulkChunkSizePerPrefix(
_In_ const std::string& bulkChunkSizePerPrefix) override
{
SWSS_LOG_ENTER();
m_bulkChunkSizePerPrefix = bulkChunkSizePerPrefix;
// No operation if the input string is invalid or no bulk context has been created
if (!parseBulkChunkSizePerPrefixConfigString(bulkChunkSizePerPrefix) || m_bulkContexts.empty())
{
return;
}
if (m_bulkContexts.size() == 1)
{
// Only one bulk context exists which means
// it is the first time per counter chunk size is configured and a unified counter ID set is polled for all objects
auto it = m_bulkContexts.begin();
std::shared_ptr<BulkContextType> singleBulkContext = it->second;
const std::vector<StatType> &allCounterIds = singleBulkContext.get()->counter_ids;
std::map<std::string, vector<StatType>> counterChunkSizePerPrefix;
std::vector<StatType> defaultPartition;
if (m_counterChunkSizeMapFromPrefix.empty())
{
// There is still no per counter prefix chunk size configured as the chunk size map is still empty.
singleBulkContext.get()->default_bulk_chunk_size = default_bulk_chunk_size;
}
else
{
// Split the counter IDs according to the counter ID prefix mapping and store them into m_bulkContexts
SWSS_LOG_NOTICE("Split counter IDs set by prefix for the first time %s", bulkChunkSizePerPrefix.c_str());
createCounterBulkChunkSizePerPrefixPartition(allCounterIds, counterChunkSizePerPrefix, defaultPartition, true);
for (auto &counterPrefix : counterChunkSizePerPrefix)
{
std::sort(counterPrefix.second.begin(), counterPrefix.second.end());
auto bulkContext = getBulkStatsContext(counterPrefix.second, counterPrefix.first, m_counterChunkSizeMapFromPrefix[counterPrefix.first]);
bulkContext.get()->counter_ids = move(counterPrefix.second);
bulkContext.get()->object_statuses.resize(singleBulkContext.get()->object_statuses.size());
bulkContext.get()->object_vids = singleBulkContext.get()->object_vids;
bulkContext.get()->object_keys = singleBulkContext.get()->object_keys;
bulkContext.get()->counters.resize(bulkContext.get()->counter_ids.size() * bulkContext.get()->object_vids.size());
SWSS_LOG_INFO("Re-initializing counter partition %s", counterPrefix.first.c_str());
}
std::sort(defaultPartition.begin(), defaultPartition.end());
setBulkStatsContext(defaultPartition, singleBulkContext);
singleBulkContext.get()->counters.resize(singleBulkContext.get()->counter_ids.size() * singleBulkContext.get()->object_vids.size());
m_bulkContexts.erase(it);
SWSS_LOG_INFO("Removed the previous default counter partition");
}
}
else if (m_counterChunkSizeMapFromPrefix.empty())
{
// There have been multiple bulk contexts which can result from
// 1. per counter prefix chunk size configuration
// 2. different objects support different counter ID set
// And there is no per counter prefix chunk size configured any more
// Multiple bulk contexts will be merged into one if they share the same object IDs set, which means case (1).
std::set<sai_object_id_t> oid_set;
std::vector<StatType> counter_ids;
std::shared_ptr<BulkContextType> defaultBulkContext;
for (auto &context : m_bulkContexts)
{
if (oid_set.empty())
{
oid_set.insert(context.second.get()->object_vids.begin(), context.second.get()->object_vids.end());
}
else
{
std::set<sai_object_id_t> tmp_oid_set(context.second.get()->object_vids.begin(), context.second.get()->object_vids.end());
if (tmp_oid_set != oid_set)
{
SWSS_LOG_ERROR("Can not merge partition because they contains different objects");
return;
}
}
if (context.second.get()->name == "default")
{
defaultBulkContext = context.second;
}
counter_ids.insert(counter_ids.end(), context.second.get()->counter_ids.begin(), context.second.get()->counter_ids.end());
}
m_bulkContexts.clear();
std::sort(counter_ids.begin(), counter_ids.end());
setBulkStatsContext(counter_ids, defaultBulkContext);
defaultBulkContext.get()->counters.resize(defaultBulkContext.get()->counter_ids.size() * defaultBulkContext.get()->object_vids.size());
}
else
{
// Multiple bulk contexts and per counter prefix chunk size
// Update the chunk size only in this case.
SWSS_LOG_NOTICE("Update bulk chunk size only %s", bulkChunkSizePerPrefix.c_str());
auto counterChunkSizeMapFromPrefix = m_counterChunkSizeMapFromPrefix;
for (auto &bulkStatsContext : m_bulkContexts)
{
auto const &name = (*bulkStatsContext.second.get()).name;
if (name == "default")
{
continue;
}
auto const &searchRef = counterChunkSizeMapFromPrefix.find(name);
if (searchRef != counterChunkSizeMapFromPrefix.end())
{
auto const &chunkSize = searchRef->second;
SWSS_LOG_INFO("Reset counter prefix %s chunk size %d", name.c_str(), chunkSize);
(*bulkStatsContext.second.get()).default_bulk_chunk_size = chunkSize;
counterChunkSizeMapFromPrefix.erase(searchRef);
}
else
{
SWSS_LOG_WARN("Update bulk chunk size: bulk chunk size for prefix %s is not provided", name.c_str());
}
}
for (auto &it : counterChunkSizeMapFromPrefix)
{
SWSS_LOG_WARN("Update bulk chunk size: prefix %s does not exist", it.first.c_str());
}
}
for (auto &it : m_bulkContexts)
{
auto &context = *it.second.get();
SWSS_LOG_INFO("%s %s partition %s number of OIDs %d number of counter IDs %d number of counters %d",
m_name.c_str(),
m_instanceId.c_str(),
context.name.c_str(),
context.object_keys.size(),
context.counter_ids.size(),
context.counters.size());
}
}