syncd/NotificationProcessor.cpp (592 lines of code) (raw):

#include "NotificationProcessor.h" #include "RedisClient.h" #include "sairediscommon.h" #include "meta/sai_serialize.h" #include "meta/SaiAttributeList.h" #include "swss/logger.h" #include "swss/notificationproducer.h" #include <inttypes.h> using namespace syncd; using namespace saimeta; NotificationProcessor::NotificationProcessor( _In_ std::shared_ptr<NotificationProducerBase> producer, _In_ std::shared_ptr<RedisClient> client, _In_ std::function<void(const swss::KeyOpFieldsValuesTuple&)> synchronizer): m_synchronizer(synchronizer), m_client(client), m_notifications(producer) { SWSS_LOG_ENTER(); m_runThread = false; m_notificationQueue = std::make_shared<NotificationQueue>(); } NotificationProcessor::~NotificationProcessor() { SWSS_LOG_ENTER(); stopNotificationsProcessingThread(); } void NotificationProcessor::sendNotification( _In_ const std::string& op, _In_ const std::string& data, _In_ std::vector<swss::FieldValueTuple> entry) { SWSS_LOG_ENTER(); SWSS_LOG_INFO("%s %s", op.c_str(), data.c_str()); m_notifications->send(op, data, entry); SWSS_LOG_DEBUG("notification send successfully"); } void NotificationProcessor::sendNotification( _In_ const std::string& op, _In_ const std::string& data) { SWSS_LOG_ENTER(); std::vector<swss::FieldValueTuple> entry; sendNotification(op, data, entry); } void NotificationProcessor::process_on_switch_state_change( _In_ sai_object_id_t switch_rid, _In_ sai_switch_oper_status_t switch_oper_status) { SWSS_LOG_ENTER(); sai_object_id_t switch_vid = m_translator->translateRidToVid(switch_rid, SAI_NULL_OBJECT_ID); auto s = sai_serialize_switch_oper_status(switch_vid, switch_oper_status); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_SWITCH_STATE_CHANGE, s); } sai_fdb_entry_type_t NotificationProcessor::getFdbEntryType( _In_ uint32_t count, _In_ const sai_attribute_t *list) { SWSS_LOG_ENTER(); for (uint32_t idx = 0; idx < count; idx++) { const sai_attribute_t &attr = list[idx]; if (attr.id == SAI_FDB_ENTRY_ATTR_TYPE) { return (sai_fdb_entry_type_t)attr.value.s32; } } SWSS_LOG_WARN("unknown fdb entry type"); int ret = -1; return (sai_fdb_entry_type_t)ret; } void NotificationProcessor::redisPutFdbEntryToAsicView( _In_ const sai_fdb_event_notification_data_t *fdb) { SWSS_LOG_ENTER(); // NOTE: this fdb entry already contains translated RID to VID std::vector<swss::FieldValueTuple> entry; entry = SaiAttributeList::serialize_attr_list( SAI_OBJECT_TYPE_FDB_ENTRY, fdb->attr_count, fdb->attr, false); sai_object_meta_key_t metaKey; metaKey.objecttype = SAI_OBJECT_TYPE_FDB_ENTRY; metaKey.objectkey.key.fdb_entry = fdb->fdb_entry; std::string strFdbEntry = sai_serialize_fdb_entry(fdb->fdb_entry); if ((fdb->fdb_entry.switch_id == SAI_NULL_OBJECT_ID || fdb->fdb_entry.bv_id == SAI_NULL_OBJECT_ID) && (fdb->event_type != SAI_FDB_EVENT_FLUSHED)) { SWSS_LOG_WARN("skipped to put int db: %s", strFdbEntry.c_str()); return; } if (fdb->event_type == SAI_FDB_EVENT_AGED) { SWSS_LOG_DEBUG("remove fdb entry %s for SAI_FDB_EVENT_AGED", sai_serialize_object_meta_key(metaKey).c_str()); m_client->removeAsicObject(metaKey); return; } if (fdb->event_type == SAI_FDB_EVENT_FLUSHED) { sai_object_id_t bv_id = fdb->fdb_entry.bv_id; sai_object_id_t port_oid = 0; sai_fdb_flush_entry_type_t type = SAI_FDB_FLUSH_ENTRY_TYPE_DYNAMIC; for (uint32_t i = 0; i < fdb->attr_count; i++) { if (fdb->attr[i].id == SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID) { port_oid = fdb->attr[i].value.oid; } else if (fdb->attr[i].id == SAI_FDB_ENTRY_ATTR_TYPE) { type = (fdb->attr[i].value.s32 == SAI_FDB_ENTRY_TYPE_STATIC) ? SAI_FDB_FLUSH_ENTRY_TYPE_STATIC : SAI_FDB_FLUSH_ENTRY_TYPE_DYNAMIC; } } m_client->processFlushEvent(fdb->fdb_entry.switch_id, port_oid, bv_id, type); return; } if (fdb->event_type == SAI_FDB_EVENT_LEARNED || fdb->event_type == SAI_FDB_EVENT_MOVE) { if (fdb->event_type == SAI_FDB_EVENT_MOVE) { SWSS_LOG_DEBUG("remove fdb entry %s for SAI_FDB_EVENT_MOVE", sai_serialize_object_meta_key(metaKey).c_str()); m_client->removeAsicObject(metaKey); } // currently we need to add type manually since fdb event don't contain type sai_attribute_t attr; attr.id = SAI_FDB_ENTRY_ATTR_TYPE; attr.value.s32 = SAI_FDB_ENTRY_TYPE_DYNAMIC; auto objectType = SAI_OBJECT_TYPE_FDB_ENTRY; auto meta = sai_metadata_get_attr_metadata(objectType, attr.id); if (meta == NULL) { SWSS_LOG_THROW("unable to get metadata for object type %s, attribute %d", sai_serialize_object_type(objectType).c_str(), attr.id); /* * TODO We should notify orch agent here. And also this probably should * not be here, but on redis side, getting through metadata. */ } std::string strAttrId = sai_serialize_attr_id(*meta); std::string strAttrValue = sai_serialize_attr_value(*meta, attr); entry.emplace_back(strAttrId, strAttrValue); m_client->createAsicObject(metaKey, entry); return; } SWSS_LOG_ERROR("event type %s not supported, FIXME", sai_serialize_fdb_event(fdb->event_type).c_str()); } /** * @Brief Check FDB event notification data. * * Every OID field in notification data as well as all OID attributes are * checked if given OID (returned from ASIC) is already present in the syncd * local database. All bridge ports, vlans should be already discovered by * syncd discovery logic. If vendor SAI will return unknown/invalid OID, this * function will return false. * * @param data FDB event notification data * * @return False if any of OID values is not present in local DB, otherwise * true. */ bool NotificationProcessor::check_fdb_event_notification_data( _In_ const sai_fdb_event_notification_data_t& data) { SWSS_LOG_ENTER(); /* * Any new RID value spotted in fdb notification can happen for 2 reasons: * * - a bug is present on the vendor SAI, all RID's are already in local or * REDIS ASIC DB but vendor SAI returned new or invalid RID * * - orch agent didn't query yet bridge ID/vlan ID and already * started to receive fdb notifications in which case warn message * could be ignored. * * If vendor SAI will return invalid RID, then this will later on lead to * inconsistent DB state and possible failure on apply view after cold or * warm boot. * * On switch init we do discover phase, and we should discover all objects * so we should not get any of those messages if SAI is in consistent * state. */ bool result = true; if (!m_translator->checkRidExists(data.fdb_entry.bv_id, true)) { SWSS_LOG_ERROR("bv_id RID 0x%" PRIx64 " is not present on local ASIC DB: %s", data.fdb_entry.bv_id, sai_serialize_fdb_entry(data.fdb_entry).c_str()); result = false; } if (!m_translator->checkRidExists(data.fdb_entry.switch_id) || data.fdb_entry.switch_id == SAI_NULL_OBJECT_ID) { SWSS_LOG_ERROR("switch_id RID 0x%" PRIx64 " is not present on local ASIC DB: %s", data.fdb_entry.switch_id, sai_serialize_fdb_entry(data.fdb_entry).c_str()); result = false; } for (uint32_t i = 0; i < data.attr_count; i++) { const sai_attribute_t& attr = data.attr[i]; auto meta = sai_metadata_get_attr_metadata(SAI_OBJECT_TYPE_FDB_ENTRY, attr.id); if (meta == NULL) { SWSS_LOG_ERROR("unable to get metadata for fdb_entry attr.id = %d", attr.id); continue; } // skip non oid attributes if (meta->attrvaluetype != SAI_ATTR_VALUE_TYPE_OBJECT_ID) continue; if (!m_translator->checkRidExists(attr.value.oid, true)) { SWSS_LOG_WARN("RID 0x%" PRIx64 " on %s is not present on local ASIC DB", attr.value.oid, meta->attridname); result = false; } } return result; } bool NotificationProcessor::contains_fdb_flush_event( _In_ uint32_t count, _In_ const sai_fdb_event_notification_data_t *data) { SWSS_LOG_ENTER(); sai_mac_t mac = { 0, 0, 0, 0, 0, 0 }; for (uint32_t idx = 0; idx < count; idx++) { if (memcmp(mac, data[idx].fdb_entry.mac_address, sizeof(mac)) == 0) return true; } return false; } void NotificationProcessor::process_on_fdb_event( _In_ uint32_t count, _In_ sai_fdb_event_notification_data_t *data) { SWSS_LOG_ENTER(); SWSS_LOG_INFO("fdb event count: %u", count); bool sendntf = true; for (uint32_t i = 0; i < count; i++) { sai_fdb_event_notification_data_t *fdb = &data[i]; sendntf &= check_fdb_event_notification_data(*fdb); if (!sendntf) { SWSS_LOG_ERROR("invalid OIDs in fdb notifications, NOT translating and NOT storing in ASIC DB"); continue; } SWSS_LOG_DEBUG("fdb %u: type: %d", i, fdb->event_type); fdb->fdb_entry.switch_id = m_translator->translateRidToVid(fdb->fdb_entry.switch_id, SAI_NULL_OBJECT_ID); fdb->fdb_entry.bv_id = m_translator->translateRidToVid(fdb->fdb_entry.bv_id, fdb->fdb_entry.switch_id, true); m_translator->translateRidToVid(SAI_OBJECT_TYPE_FDB_ENTRY, fdb->fdb_entry.switch_id, fdb->attr_count, fdb->attr, true); /* * Currently because of brcm bug, we need to install fdb entries in * asic view and currently this event don't have fdb type which is * required on creation. */ redisPutFdbEntryToAsicView(fdb); } if (sendntf) { std::string s = sai_serialize_fdb_event_ntf(count, data); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT, s); } else { SWSS_LOG_ERROR("FDB notification was not sent since it contain invalid OIDs, bug?"); } } /** * @Brief Check NAT event notification data. * * Every OID field in notification data as well as all OID attributes are * checked if given OID (returned from ASIC) is already present in the syncd * local database. If vendor SAI will return unknown/invalid OID, this * function will return false. * * @param data NAT event notification data * * @return False if any of OID values is not present in local DB, otherwise * true. */ bool NotificationProcessor::check_nat_event_notification_data( _In_ const sai_nat_event_notification_data_t& data) { SWSS_LOG_ENTER(); bool result = true; if (!m_translator->checkRidExists(data.nat_entry.vr_id, true)) { SWSS_LOG_ERROR("vr_id RID 0x%" PRIx64 " is not present on local ASIC DB: %s", data.nat_entry.vr_id, sai_serialize_nat_entry(data.nat_entry).c_str()); result = false; } if (!m_translator->checkRidExists(data.nat_entry.switch_id) || data.nat_entry.switch_id == SAI_NULL_OBJECT_ID) { SWSS_LOG_ERROR("switch_id RID 0x%" PRIx64 " is not present on local ASIC DB: %s", data.nat_entry.switch_id, sai_serialize_nat_entry(data.nat_entry).c_str()); result = false; } return result; } void NotificationProcessor::process_on_nat_event( _In_ uint32_t count, _In_ sai_nat_event_notification_data_t *data) { SWSS_LOG_ENTER(); SWSS_LOG_INFO("nat event count: %u", count); bool sendntf = true; for (uint32_t i = 0; i < count; i++) { sai_nat_event_notification_data_t *nat = &data[i]; sendntf &= check_nat_event_notification_data(*nat); if (!sendntf) { SWSS_LOG_ERROR("invalid OIDs in nat notifications, NOT translating and NOT storing in ASIC DB"); continue; } SWSS_LOG_DEBUG("nat %u: type: %d", i, nat->event_type); nat->nat_entry.switch_id = m_translator->translateRidToVid(nat->nat_entry.switch_id, SAI_NULL_OBJECT_ID); nat->nat_entry.vr_id = m_translator->translateRidToVid(nat->nat_entry.vr_id, nat->nat_entry.switch_id, true); } if (sendntf) { std::string s = sai_serialize_nat_event_ntf(count, data); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_NAT_EVENT, s); } else { SWSS_LOG_ERROR("NAT notification was not sent since it contain invalid OIDs, bug?"); } } void NotificationProcessor::process_on_queue_deadlock_event( _In_ uint32_t count, _In_ sai_queue_deadlock_notification_data_t *data) { SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("queue deadlock notification count: %u", count); for (uint32_t i = 0; i < count; i++) { sai_queue_deadlock_notification_data_t *deadlock_data = &data[i]; /* * We are using switch_rid as null, since queue should be already * defined inside local db after creation. * * If this will be faster than return from create queue then we can use * query switch id and extract rid of switch id and then convert it to * switch vid. */ deadlock_data->queue_id = m_translator->translateRidToVid(deadlock_data->queue_id, SAI_NULL_OBJECT_ID); } std::string s = sai_serialize_queue_deadlock_ntf(count, data); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_QUEUE_PFC_DEADLOCK, s); } void NotificationProcessor::process_on_port_host_tx_ready_change( _In_ sai_object_id_t switch_id, _In_ sai_object_id_t port_id, _In_ sai_port_host_tx_ready_status_t *host_tx_ready_status) { SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("Port ID before translating from RID to VID is %s", sai_serialize_object_id(port_id).c_str()); sai_object_id_t port_vid = m_translator->translateRidToVid(port_id, SAI_NULL_OBJECT_ID); SWSS_LOG_DEBUG("Port ID after translating from RID to VID is %s", sai_serialize_object_id(port_id).c_str()); sai_object_id_t switch_vid = m_translator->translateRidToVid(switch_id, SAI_NULL_OBJECT_ID); std::string s = sai_serialize_port_host_tx_ready_ntf(switch_vid, port_vid, *host_tx_ready_status); SWSS_LOG_DEBUG("Host_tx_ready status after sai_serialize is %s", s.c_str()); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_PORT_HOST_TX_READY, s); } void NotificationProcessor::process_on_port_state_change( _In_ uint32_t count, _In_ sai_port_oper_status_notification_t *data) { SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("port notification count: %u", count); for (uint32_t i = 0; i < count; i++) { sai_port_oper_status_notification_t *oper_stat = &data[i]; sai_object_id_t rid = oper_stat->port_id; /* * We are using switch_rid as null, since port should be already * defined inside local db after creation. * * If this will be faster than return from create port then we can use * query switch id and extract rid of switch id and then convert it to * switch vid. */ SWSS_LOG_INFO("Port RID %s state change notification", sai_serialize_object_id(rid).c_str()); if (false == m_translator->tryTranslateRidToVid(rid, oper_stat->port_id)) { SWSS_LOG_WARN("Port RID %s transalted to null VID!!!", sai_serialize_object_id(rid).c_str()); } /* * Port may be in process of removal. OA may receive notification for VID either * SAI_NULL_OBJECT_ID or non exist at time of processing */ SWSS_LOG_INFO("Port VID %s state change notification", sai_serialize_object_id(oper_stat->port_id).c_str()); } std::string s = sai_serialize_port_oper_status_ntf(count, data); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_PORT_STATE_CHANGE, s); } void NotificationProcessor::process_on_bfd_session_state_change( _In_ uint32_t count, _In_ sai_bfd_session_state_notification_t *data) { SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("bfd session state notification count: %u", count); for (uint32_t i = 0; i < count; i++) { sai_bfd_session_state_notification_t *bfd_session_state = &data[i]; /* * We are using switch_rid as null, since BFD should be already * defined inside local db after creation. * * If this will be faster than return from create BFD then we can use * query switch id and extract rid of switch id and then convert it to * switch vid. */ bfd_session_state->bfd_session_id = m_translator->translateRidToVid(bfd_session_state->bfd_session_id, SAI_NULL_OBJECT_ID, true); } std::string s = sai_serialize_bfd_session_state_ntf(count, data); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_BFD_SESSION_STATE_CHANGE, s); } void NotificationProcessor::process_on_switch_asic_sdk_health_event( _In_ sai_object_id_t switch_rid, _In_ sai_switch_asic_sdk_health_severity_t severity, _In_ sai_timespec_t timestamp, _In_ sai_switch_asic_sdk_health_category_t category, _In_ sai_switch_health_data_t data, _In_ const sai_u8_list_t description) { SWSS_LOG_ENTER(); sai_object_id_t switch_vid = m_translator->translateRidToVid(switch_rid, SAI_NULL_OBJECT_ID); std::string s = sai_serialize_switch_asic_sdk_health_event(switch_vid, severity, timestamp, category, data, description); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_SWITCH_ASIC_SDK_HEALTH_EVENT, s); } void NotificationProcessor::process_on_switch_shutdown_request( _In_ sai_object_id_t switch_rid) { SWSS_LOG_ENTER(); sai_object_id_t switch_vid = m_translator->translateRidToVid(switch_rid, SAI_NULL_OBJECT_ID); std::string s = sai_serialize_switch_shutdown_request(switch_vid); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_SWITCH_SHUTDOWN_REQUEST, s); } void NotificationProcessor::process_on_twamp_session_event( _In_ uint32_t count, _In_ sai_twamp_session_event_notification_data_t *data) { SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("twamp session state notification count: %u", count); for (uint32_t i = 0; i < count; i++) { sai_twamp_session_event_notification_data_t *twamp_session_state = &data[i]; /* * We are using switch_rid as null, since TWAMP should be already * defined inside local db after creation. * * If this will be faster than return from create TWAMP then we can use * query switch id and extract rid of switch id and then convert it to * switch vid. */ twamp_session_state->twamp_session_id = m_translator->translateRidToVid(twamp_session_state->twamp_session_id, SAI_NULL_OBJECT_ID); } /* send notification to syncd */ std::string s = sai_serialize_twamp_session_event_ntf(count, data); sendNotification(SAI_SWITCH_NOTIFICATION_NAME_TWAMP_SESSION_EVENT, s); } void NotificationProcessor::handle_switch_state_change( _In_ const std::string &data) { SWSS_LOG_ENTER(); sai_switch_oper_status_t switch_oper_status; sai_object_id_t switch_id; sai_deserialize_switch_oper_status(data, switch_id, switch_oper_status); process_on_switch_state_change(switch_id, switch_oper_status); } void NotificationProcessor::handle_fdb_event( _In_ const std::string &data) { SWSS_LOG_ENTER(); uint32_t count; sai_fdb_event_notification_data_t *fdbevent = NULL; sai_deserialize_fdb_event_ntf(data, count, &fdbevent); if (contains_fdb_flush_event(count, fdbevent)) { SWSS_LOG_NOTICE("got fdb flush event: %s", data.c_str()); } process_on_fdb_event(count, fdbevent); sai_deserialize_free_fdb_event_ntf(count, fdbevent); } void NotificationProcessor::handle_nat_event( _In_ const std::string &data) { SWSS_LOG_ENTER(); uint32_t count; sai_nat_event_notification_data_t *natevent = NULL; sai_deserialize_nat_event_ntf(data, count, &natevent); process_on_nat_event(count, natevent); sai_deserialize_free_nat_event_ntf(count, natevent); } void NotificationProcessor::handle_queue_deadlock( _In_ const std::string &data) { SWSS_LOG_ENTER(); uint32_t count; sai_queue_deadlock_notification_data_t *qdeadlockevent = NULL; sai_deserialize_queue_deadlock_ntf(data, count, &qdeadlockevent); process_on_queue_deadlock_event(count, qdeadlockevent); sai_deserialize_free_queue_deadlock_ntf(count, qdeadlockevent); } void NotificationProcessor::handle_port_state_change( _In_ const std::string &data) { SWSS_LOG_ENTER(); uint32_t count; sai_port_oper_status_notification_t *portoperstatus = NULL; sai_deserialize_port_oper_status_ntf(data, count, &portoperstatus); process_on_port_state_change(count, portoperstatus); sai_deserialize_free_port_oper_status_ntf(count, portoperstatus); } void NotificationProcessor::handle_port_host_tx_ready_change( _In_ const std::string &data) { SWSS_LOG_ENTER(); sai_object_id_t port_id; sai_object_id_t switch_id; sai_port_host_tx_ready_status_t host_tx_ready_status; sai_deserialize_port_host_tx_ready_ntf(data, switch_id, port_id, host_tx_ready_status); process_on_port_host_tx_ready_change(switch_id, port_id, &host_tx_ready_status); } void NotificationProcessor::handle_bfd_session_state_change( _In_ const std::string &data) { SWSS_LOG_ENTER(); uint32_t count; sai_bfd_session_state_notification_t *bfdsessionstate = NULL; sai_deserialize_bfd_session_state_ntf(data, count, &bfdsessionstate); process_on_bfd_session_state_change(count, bfdsessionstate); sai_deserialize_free_bfd_session_state_ntf(count, bfdsessionstate); } void NotificationProcessor::handle_switch_asic_sdk_health_event( _In_ const std::string &data) { SWSS_LOG_ENTER(); sai_object_id_t switch_id; sai_switch_asic_sdk_health_severity_t severity; sai_timespec_t timestamp; sai_switch_asic_sdk_health_category_t category; sai_switch_health_data_t health_data; sai_u8_list_t description; sai_deserialize_switch_asic_sdk_health_event(data, switch_id, severity, timestamp, category, health_data, description); process_on_switch_asic_sdk_health_event(switch_id, severity, timestamp, category, health_data, description); sai_deserialize_free_switch_asic_sdk_health_event(description); } void NotificationProcessor::handle_switch_shutdown_request( _In_ const std::string &data) { SWSS_LOG_ENTER(); sai_object_id_t switch_id; sai_deserialize_switch_shutdown_request(data, switch_id); process_on_switch_shutdown_request(switch_id); } void NotificationProcessor::handle_twamp_session_event( _In_ const std::string &data) { SWSS_LOG_ENTER(); uint32_t count; sai_twamp_session_event_notification_data_t *twampsessionevent = NULL; sai_deserialize_twamp_session_event_ntf(data, count, &twampsessionevent); process_on_twamp_session_event(count, twampsessionevent); sai_deserialize_free_twamp_session_event_ntf(count, twampsessionevent); } void NotificationProcessor::processNotification( _In_ const swss::KeyOpFieldsValuesTuple& item) { SWSS_LOG_ENTER(); m_synchronizer(item); } void NotificationProcessor::syncProcessNotification( _In_ const swss::KeyOpFieldsValuesTuple& item) { SWSS_LOG_ENTER(); std::string notification = kfvKey(item); std::string data = kfvOp(item); if (notification == SAI_SWITCH_NOTIFICATION_NAME_SWITCH_STATE_CHANGE) { handle_switch_state_change(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT) { handle_fdb_event(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_NAT_EVENT) { handle_nat_event(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_PORT_STATE_CHANGE) { handle_port_state_change(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_PORT_HOST_TX_READY) { handle_port_host_tx_ready_change(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_SWITCH_SHUTDOWN_REQUEST) { handle_switch_shutdown_request(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_SWITCH_ASIC_SDK_HEALTH_EVENT) { handle_switch_asic_sdk_health_event(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_QUEUE_PFC_DEADLOCK) { handle_queue_deadlock(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_BFD_SESSION_STATE_CHANGE) { handle_bfd_session_state_change(data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_TWAMP_SESSION_EVENT) { handle_twamp_session_event(data); } else { SWSS_LOG_ERROR("unknown notification: %s", notification.c_str()); } } void NotificationProcessor::ntf_process_function() { SWSS_LOG_ENTER(); std::mutex ntf_mutex; std::unique_lock<std::mutex> ulock(ntf_mutex); while (m_runThread) { m_cv.wait(ulock); // this is notifications processing thread context, which is different // from SAI notifications context, we can safe use syncd mutex here, // processing each notification is under same mutex as processing main // events, counters and reinit swss::KeyOpFieldsValuesTuple item; while (m_notificationQueue->tryDequeue(item)) { processNotification(item); } } } void NotificationProcessor::startNotificationsProcessingThread() { SWSS_LOG_ENTER(); m_runThread = true; m_ntf_process_thread = std::make_shared<std::thread>(&NotificationProcessor::ntf_process_function, this); } void NotificationProcessor::stopNotificationsProcessingThread() { SWSS_LOG_ENTER(); m_runThread = false; m_cv.notify_all(); if (m_ntf_process_thread != nullptr) { m_ntf_process_thread->join(); } m_ntf_process_thread = nullptr; } void NotificationProcessor::signal() { SWSS_LOG_ENTER(); m_cv.notify_all(); } std::shared_ptr<NotificationQueue> NotificationProcessor::getQueue() const { SWSS_LOG_ENTER(); return m_notificationQueue; }