src/outmdsd/DataReader.cc (205 lines of code) (raw):

extern "C" { #include <unistd.h> } #include <cassert> #include "ConcurrentMap.h" #include "DataReader.h" #include "Exceptions.h" #include "Trace.h" #include "TraceMacros.h" #include "SocketClient.h" using namespace EndpointLog; DataReader::DataReader( const std::shared_ptr<SocketClient> & sockClient, const std::shared_ptr<ConcurrentMap<LogItemPtr>> & dataCache ) : m_socketClient(sockClient), m_dataCache(dataCache) { assert(m_socketClient); } DataReader::~DataReader() { try { Stop(); } catch(const std::exception & ex) { Log(TraceLevel::Error, "~DataReader() exception: " << ex.what()); } catch(...) { } // no exception thrown from destructor } void DataReader::Stop() { ADD_INFO_TRACE; m_stopRead = true; } void DataReader::Run() { ADD_INFO_TRACE; try { std::string partialData; while(true) { if (!DoRead(partialData)) { break; } } } catch(const ReaderInterruptException&) { Log(TraceLevel::Info, "DataReader is interrupted. Abort reader thread."); } catch(const std::exception & ex) { Log(TraceLevel::Error, "DataReader unexpected exception: " << ex.what()); } } bool DataReader::DoRead( std::string & partialData ) { ADD_DEBUG_TRACE; char buf[512]; try { InterruptPoint(); auto readRtn = m_socketClient->Read(buf, sizeof(buf)-1); if (-1 == readRtn) { Log(TraceLevel::Debug, "SocketClient is stopped. Abort read."); return false; } if (readRtn > 0) { InterruptPoint(); buf[readRtn] = '\0'; partialData = ProcessData(partialData+buf); Log(TraceLevel::Debug, "DoRead partialData='" << partialData << "'."); } } catch(const SocketException & ex) { Log(TraceLevel::Info, "SocketException " << ex.what()); } return true; } void DataReader::InterruptPoint() const { if (m_stopRead) { throw ReaderInterruptException(); } } std::string DataReader::ProcessData( const std::string & str ) { ADD_DEBUG_TRACE; Log(TraceLevel::Debug, "ProcessData: '" << str << "'."); if (str.empty()) { return std::string(); } auto dPos = str.find_last_of('\n'); if (std::string::npos == dPos) { return str; } std::istringstream iss(str); std::string item; while(std::getline(iss, item, '\n')) { if (!iss.eof()) { ProcessItem(item); } } if (!item.empty() && dPos == (str.size()-1)) { ProcessItem(item); return std::string(); } return str.substr(dPos+1); } void DataReader::ProcessItem( const std::string& item ) { ADD_DEBUG_TRACE; if (item.empty()) { Log(TraceLevel::Warning, "unexpected empty ack item found."); return; } m_nTagsRead++; Log(TraceLevel::Debug, "Got item='" << item << "'"); auto p = item.find(':'); if (p == std::string::npos) { ProcessTag(item); } else { auto tag = item.substr(0, p); auto ackStatus = item.substr(p+1); ProcessTag(tag, ackStatus); } } void DataReader::ProcessTag( const std::string & tag ) { if (tag.empty()) { Log(TraceLevel::Warning, "unexpected empty tag found."); return; } if (m_dataCache) { if (1 != m_dataCache->Erase(tag)) { Log(TraceLevel::Warning, "tag '" << tag << "' is not found in backup cache"); } } } static std::unordered_map<std::string, std::string> & GetAckStatusMap() { static std::unordered_map<std::string, std::string> m = { { "0", "ACK_SUCCESS" }, { "1", "ACK_FAILED" }, { "2", "ACK_UNKNOWN_SCHEMA_ID" }, { "3", "ACK_DECODE_ERROR" }, { "4", "ACK_INVALID_SOURCE" }, { "5", "ACK_DUPLICATE_SCHEMA_ID" } }; return m; } static std::string GetAckStatusStr( const std::string & ackCode ) { auto m = GetAckStatusMap(); auto item = m.find(ackCode); if (item == m.end()) { return "Unknown-ACK-CODE"; } return item->second; } void DataReader::ProcessTag( const std::string & tag, const std::string & ackStatus ) { if (tag.empty()) { Log(TraceLevel::Warning, "unexpected empty tag found"); return; } if (ackStatus.empty()) { Log(TraceLevel::Warning, "unexpected empty ack status string found"); return; } if ("0" != ackStatus) { auto statusStr = GetAckStatusStr(ackStatus); Log(TraceLevel::Error, "unexpected mdsd ack status: " << statusStr << ", tag '" << tag << "'" ); } else { // Only remove item from cache if ack status is 0 (Success) if (m_dataCache) { if (1 != m_dataCache->Erase(tag)) { Log(TraceLevel::Warning, "tag '" << tag << "' is not found in backup cache"); } } } }