host/cxpslib/Telemetry/cxpstelemetrylogger.cpp (422 lines of code) (raw):
#include <listfile.h>
#include "cxpstelemetrylogger.h"
#include "cxps.h"
namespace CxpsTelemetry
{
// Static data member definitions
CxpsTelemetryLogger CxpsTelemetryLogger::s_Instance;
std::string CxpsTelemetryRowBase::s_biosId;
std::string CxpsTelemetryRowBase::s_fabType;
std::string CxpsTelemetryRowBase::s_psHostId;
std::string CxpsTelemetryRowBase::s_psAgentVer;
uint64_t CxpsTelemetryRowBase::s_psProcessId = 0;
boost_pt::ptime Defaults::s_defaultPTime; // == Default construction
SteadyClock::time_point Defaults::s_defaultTimePoint; // == time_since_epoch() == 0
chrono::nanoseconds Defaults::s_zeroTime; // == chrono::nanoseconds::zero()
SourceDiffTelemetryRow DeviceLevelTelemetryMap::s_sourceDiffTel_CacheObj;
SourceResyncTelemetryRow DeviceLevelTelemetryMap::s_sourceResyncTel_CacheObj;
SourceGeneralTelemetryRow HostLevelTelemetryMap::s_sourceGenTel_CacheObj;
// String constants
namespace FileAttributes
{
const std::string CxpsTransportTableName("InMageTelemetryPSTransport");
const std::string CompletedFilePart("_completed_");
const std::string CurrentFileSuffix("_current");
const std::string Extension(".json");
}
class MdsLogUnit
{
public:
void serialize(JSON::Adapter &adapter)
{
JSON::Class root(adapter, "MdsLogUnit", false);
JSON_T_KV_PRODUCER_ONLY(adapter, "Map", m_rowToConvert);
}
static void ConvertTelemetryRowToMARSFormat(CxpsTelemetryRowBase &rowToConvert, std::string &fileRow)
{
CSMode csMode = GetCSMode();
if (csMode == CS_MODE_RCM)
{
fileRow = JSON::producer<CxpsTelemetryRowBase>::convert(rowToConvert);
}
else
{
MdsLogUnit tempLogUnit(rowToConvert);
fileRow = JSON::producer<MdsLogUnit>::convert(tempLogUnit);
}
}
private:
CxpsTelemetryRowBase &m_rowToConvert;
MdsLogUnit(CxpsTelemetryRowBase &rowToConvert)
: m_rowToConvert(rowToConvert)
{
}
};
void CxpsTelemetryLogger::Initialize(
const std::string &psId,
const boost_fs::path &folderPath,
boost::chrono::seconds writeInterval,
int maxCompletedFilesCnt)
{
namespace FA = FileAttributes;
m_psId = psId;
m_folderPath = folderPath;
m_writeInterval = writeInterval;
m_maxCompletedFilesCnt = maxCompletedFilesCnt;
if (!m_folderPath.has_root_directory())
throw std::runtime_error("CxpsTelemetryLogger output folder path is invalid : " + m_folderPath.string());
// InMageTelemetryPSTransport_completed_
m_completedFileNamePrefix = FA::CxpsTransportTableName + FA::CompletedFilePart;
// <folderPath>/InMageTelemetryPSTransport_completed_*.json
m_completedFileSearchPattern = m_folderPath /
(m_completedFileNamePrefix + "*" + FA::Extension);
// InMageTelemetryPSTransport_current.json
std::string currentFileName =
FA::CxpsTransportTableName + FA::CurrentFileSuffix + FA::Extension;
// <folderPath>/InMageTelemetryPSTransport_current.json
m_currentFilePath = m_folderPath / currentFileName;
m_isInitialized = true;
}
void CxpsTelemetryLogger::Start()
{
if (!m_isInitialized)
{
BOOST_ASSERT(false);
throw std::logic_error("CxpsTelemetryLogger is started without initialization");
}
BOOST_ASSERT(!m_thread);
// On thread spawn failure, the exception would stop the server. So, it's
// guaranteed that this thread would always be running, as long as the
// server is running.
m_thread.reset(
new boost::thread(boost::bind(&CxpsTelemetryLogger::Run, this)));
}
void CxpsTelemetryLogger::Stop()
{
// If the thread was created, signal completion and wait for it to finish.
if (m_thread)
{
BOOST_ASSERT(m_isInitialized);
m_thread->interrupt();
m_thread->join();
m_thread.reset();
}
}
void CxpsTelemetryLogger::Run()
{
bool hasLoggerStopped = false;
size_t dbgIgnoredExceptionCnt = 0;
CXPS_LOG_ERROR_INFO("Starting CxpsTelemetryLogger thread ");
while (!hasLoggerStopped)
{
try
{
try
{
boost::this_thread::sleep_for(m_writeInterval);
}
catch (boost::thread_interrupted)
{
CXPS_LOG_ERROR_INFO("Stopping CxpsTelemetryLogger thread "
"(after flushing the remaining telemetry data)");
hasLoggerStopped = true;
// Write the remainder data into the next file and quit.
// TODO-SanKumar-1710: Assert that at the final collection,
// there's no pending data logged, since all the server threads
// are awaited before stopping telemetry logger. We should be
// able to account for every data till this point.
}
// If any of the identifiers couldn't be retrieved, we fill in
// empty value and continue. Try to retrieve failed ones before
// writing each file.
CxpsTelemetryRowBase::RefreshSystemIdentifiers();
// TODO-SanKumar-1710: Listen for cancellation within this long running call (or)
// is it better to dump all the data that we have and then spinning off cxps server,
// as currently implemented.
this->WriteDataToNextFile(hasLoggerStopped);
}
// TODO: Move this comment to serverctl.cpp (Actually it's not happening. Even getting the stopping log as of today)
// TODO-SanKumar-1709: Add a log for any exception.
// The problem is that simple log monitor thread might be closed, due
// to the thread join ordering in the stopservers(). To add logging
// to CxpsTelemetryLogger, split the monitoring and server threads
// joining and stop threads in the following order: server threads,
// telLogger and finally monitor threads.
GENERIC_CATCH_LOG_ACTION("CxpsTelemetryLogger run loop", dbgIgnoredExceptionCnt++)
}
}
void CxpsTelemetryLogger::WriteDataToNextFile(bool isFinalWrite)
{
// Poll for the files and see if there's possibility of adding one more file.
int completedFileCnt = 0;
{
std::string fileList;
const bool IncludeFileSize = false;
ListFile::listFileGlob(m_completedFileSearchPattern, fileList, IncludeFileSize);
for (size_t matchInd = fileList.find('\n'); matchInd != std::string::npos; completedFileCnt++)
matchInd = fileList.find('\n', matchInd + 1);
}
if (!isFinalWrite && completedFileCnt >= m_maxCompletedFilesCnt)
{
// The data won't be flushed and so the interval of the data would continue to expand till
// the next write. We actually don't lose any data.
// TODO-SanKumar-1711: Should we add a counter for this and ship it
// as part of the logs for better telemetry debugging?
return;
}
boost::system::error_code errorCode;
if (!boost_fs::exists(m_currentFilePath.parent_path(), errorCode))
boost_fs::create_directories(m_currentFilePath.parent_path(), errorCode);
if (errorCode)
throw std::runtime_error(errorCode.message());
++m_printSessionId;
m_curFileRowCount = 0;
try
{
bool isRowObjUnusable;
bool shouldContinueWritingToFile;
std::ofstream currentFileOutStream;
currentFileOutStream.exceptions(std::ofstream::failbit | std::ofstream::badbit);
currentFileOutStream.open(m_currentFilePath.string().c_str(), std::ofstream::trunc | std::ofstream::binary);
CxpsTelemetryRowBasePtr nextTelRowToPrint;
CxpsTelemetryRowBase *cacheObj;
while (nextTelRowToPrint = m_hostTelemetryMap.GetNextTelemetryRowToPrint(m_printSessionId, &cacheObj))
{
// TODO-SanKumar-1710: For a moment in between the above and below calls, the lock
// could be owned by another request completion and that could probably invalidate
// the row object. So, we should overcome that by acquiring lock, validating the
// object, checking if not empty here, then write the row and finally releasing the
// lock. Applies for all the row objects.
if (!nextTelRowToPrint->IsEmpty())
{
shouldContinueWritingToFile = this->WriteRowToFile(
currentFileOutStream, *nextTelRowToPrint, *cacheObj, isRowObjUnusable);
// TODO-SanKumar-1711: Currently, we don't know the key to the
// obj in the host tel map. So, we can't regenerate a valid obj,
// by calling GetValidRow(key) - see GlobalTelemetry reset below.
// Doing that operation in this thread avoids the load on the
// server worker thread.
//if(isRowObjUnusable) { Reset here... }
if (!shouldContinueWritingToFile)
goto WritesCompleted;
}
}
GlobalCxpsTelemetryRowPtr globalTel = PruneOrGetGlobalTelemetry();
if (globalTel && !globalTel->IsEmpty())
{
shouldContinueWritingToFile = this->WriteRowToFile(
currentFileOutStream, *globalTel, m_cacheGlobalTelRow, isRowObjUnusable);
// Refresh the telemetry object (unless already done by another thread)
if (isRowObjUnusable)
GetValidGlobalTelemetry();
if (!shouldContinueWritingToFile)
goto WritesCompleted;
}
if (m_curFileRowCount == 0)
{
// No data. Write an empty message.
// We have to generate an empty telemetry row to cut from, since we can't persist
// empty row's object across iterations. Say at itr1 - no data, itr2 - data and
// itr3 - no data. If a persisted object is used, the empty row of itr3 would have
// logger time from itr1 to tr3, where it must be itr2 to itr3.
EmptyCxpsTelemetryRow currEmptyTelemetry(GetLastRowUploadTime());
shouldContinueWritingToFile = this->WriteRowToFile(
currentFileOutStream, currEmptyTelemetry, m_cacheEmptyTelRow, isRowObjUnusable);
if (!shouldContinueWritingToFile)
goto WritesCompleted;
}
}
// TODO-SanKumar-1710: Any exception caught is forgotten after this point.
// Should we point that somewhere? Is the message drop logged already by
// the above block enough?
// TODO-SanKumar-1711: If there's low disk space, we would end up eating
// more disk space with this log, which will one line per each file.
GENERIC_CATCH_LOG_IGNORE("Writing telemetry data to file");
WritesCompleted:
// On both success and falilure, transport as many rows that have been written successfully.
if (m_curFileRowCount != 0)
{
// Rename the file - following similar strategy as the PS telemetry
// files generated by tmansvc.
// InMageTelemetryPSTransport_completed_1506512354.json
std::string genCompletedFileName =
m_completedFileNamePrefix + Utils::GetTimeInSecSinceEpoch1970() + FileAttributes::Extension;
boost_fs::rename(m_currentFilePath, m_folderPath / genCompletedFileName);
}
}
bool CxpsTelemetryLogger::WriteRowToFile(
std::ofstream &ofstream, CxpsTelemetryRowBase &rowObj, CxpsTelemetryRowBase &cacheObj, bool &isRowObjUnusable)
{
bool shouldContinueWritingToFile;
bool willDataBeDropped;
isRowObjUnusable = false;
try
{
cacheObj.Clear();
boost_pt::ptime currSystemTime = boost_pt::microsec_clock::universal_time();
if (!rowObj.GetPrevWindowRow(cacheObj, currSystemTime))
{
// If the failed operation without invalidating the row object, this object could be
// continued to be used and this data would be uploaded in the next iteration.
// If the failed operation invalidated the row object, then the object is unusable.
shouldContinueWritingToFile = true;
isRowObjUnusable = !rowObj.IsValid();
willDataBeDropped = isRowObjUnusable;
goto Cleanup;
}
BOOST_ASSERT(rowObj.IsValid() && cacheObj.IsValid());
// Don't update the global sequence number yet.
cacheObj.SetSeqNumber(m_seqNum.load(boost::memory_order_acquire) + 1);
try
{
MdsLogUnit::ConvertTelemetryRowToMARSFormat(cacheObj, m_cacheFileLine);
ofstream << m_cacheFileLine << std::endl;
// Following disconnected increment using the atomic object, in order to
// support update of the seqNum, if an object is dropped.
m_seqNum.fetch_add(1, boost::memory_order_release);
++m_curFileRowCount;
SetLastRowUploadTime(currSystemTime);
shouldContinueWritingToFile = true;
isRowObjUnusable = false;
willDataBeDropped = false;
goto Cleanup;
}
GENERIC_CATCH_LOG_IGNORE("WriteRowToFile - File write");
// Stop writing to the file anymore for this iteration, since we hit an
// error while serializing and writing to the file.
shouldContinueWritingToFile = false;
// Since there has been a failure in serializing / writing to the file,
// add the data back to the row object.
if (rowObj.IsValid() && cacheObj.IsValid())
{
if (rowObj.AddBackPrevWindow(cacheObj))
{
BOOST_ASSERT(rowObj.IsValid() && cacheObj.IsValid());
// The data has been successfully added back to the row object.
// That would be uploaded in the next iteration.
isRowObjUnusable = false;
willDataBeDropped = false;
}
else
{
// Failed to add the data back to the row.
// On failure to addBack(), if the row obj is valid, we should continue using it,
// since new data could've been added to this object after getPrev() in this stack.
isRowObjUnusable = !rowObj.IsValid();
// Even if the row object is valid, we will end up missing the data in cacheObj.
// (unlike getPrev() failure in the beginning of this method)
willDataBeDropped = true;
// Special case: The old data couldn't be added back to the row, even if it's
// valid with the new data.
// So, the object would only have data starting from the current time.
if (rowObj.IsValid())
SetLastRowUploadTime(currSystemTime);
}
goto Cleanup;
}
// Not expected to reach this point. But since unhandled, assume the worst.
BOOST_ASSERT(false);
isRowObjUnusable = true;
willDataBeDropped = true;
goto Cleanup;
}
// Unexpected catch. Assume the worst.
GENERIC_CATCH_LOG_ACTION("WriteRowToFile", BOOST_ASSERT(false); isRowObjUnusable = true; willDataBeDropped = true; shouldContinueWritingToFile = false);
Cleanup:
if (willDataBeDropped)
{
// Message type of obj wouldn't be corruped at any cost.
this->ReportRowObjDrop(rowObj.GetMessageType());
}
return shouldContinueWritingToFile;
}
void CxpsTelemetryLogger::ReportRowObjDrop(MessageType msgType)
{
GlobalCxpsTelemetryRowPtr globalTel = GetValidGlobalTelemetry();
// Increment the counter in the global telemetry
if (globalTel)
globalTel->ReportMessageDrop(msgType);
LapseSequenceNumber();
}
#define GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(functionName, ...) {\
GlobalCxpsTelemetryRowPtr globTelPtr = GetValidGlobalTelemetry(); \
if (globTelPtr) \
globTelPtr->functionName(__VA_ARGS__); \
\
LapseSequenceNumber(); \
}
boost_pt::ptime GetLastRowUploadTime()
{
return CxpsTelemetryLogger::GetInstance().GetLastRowUploadTime();
}
void AddRequestDataToTelemetryLogger(const RequestTelemetryData &reqTelData)
{
CxpsTelemetryLogger::GetInstance().AddRequestDataToTelemetryLogger(reqTelData);
}
void CxpsTelemetryLogger::AddRequestDataToTelemetryLogger(const RequestTelemetryData &reqTelData)
{
try
{
if (reqTelData.m_dataError != ReqTelErr_None)
{
// No data in here could be trusted.
// TODO-SanKumar-1711: We could probably trial and error by testing values and see
// if anything could be salvaged out of this. (For ex, although we called out there's
// error in telemetry data, host id isn't affected in the current workflow).
// TODO-SanKumar-1711: Should we add this data to SourceGeneral telemetry, so we get
// the type of file (or) at least the failing host for easier debugging? (The data
// representation will get complex).
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportRequestTelemetryDataError, reqTelData.m_dataError);
return;
}
std::string parsedHostId;
std::string parsedDevice;
std::string parsedFileName;
RequestTelemetryDataLevel dataLevel = reqTelData.m_dataLevel;
FileType retrievedFileType = FileType_Invalid;
if (dataLevel == ReqTelLevel_File)
{
if (reqTelData.m_filePath.empty())
{
BOOST_ASSERT(false);
retrievedFileType = FileType_Invalid;
dataLevel = ReqTelLevel_Session; // Fallback to session level logging
}
else
{
// Trusting the parsed host id instead of the session advertised host id,
// while uploading source file telemetries.
if (CS_MODE_RCM == GetCSMode())
{
retrievedFileType = SourceFilePathParser::GetCxpsFileTypeInRcmMode(
reqTelData.m_filePath, parsedHostId, parsedDevice, parsedFileName);
}
else
{
retrievedFileType = SourceFilePathParser::GetCxpsFileType(
reqTelData.m_filePath, parsedHostId, parsedDevice, parsedFileName);
}
switch (retrievedFileType)
{
case FileType_DiffSync:
{
if (parsedHostId.empty() || parsedDevice.empty() || parsedFileName.empty())
{
BOOST_ASSERT(false);
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_DiffEmptyMeta);
return;
}
BOOST_ASSERT(parsedHostId == reqTelData.m_hostId);
DiffSyncFileMetadata diffFileMetadata;
SourceFilePathParser::ParseDiffSyncFileName(parsedFileName, diffFileMetadata);
SourceDiffTelemetryRowPtr sourceDiffRow =
m_hostTelemetryMap.GetValidSourceDiffTelemetry(parsedHostId, parsedDevice);
if (!sourceDiffRow)
{
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_Alloc);
return;
}
if (!sourceDiffRow->AddToTelemetry(reqTelData.m_requestType, reqTelData, diffFileMetadata))
{
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_AddToDiffTel);
return;
}
return; // Successfully added the telemetry
}
case FileType_Resync:
{
if (parsedHostId.empty() || parsedDevice.empty() || parsedFileName.empty())
{
BOOST_ASSERT(false);
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_ResyncEmptyMeta);
return;
}
BOOST_ASSERT(parsedHostId == reqTelData.m_hostId);
ResyncFileMetadata resyncFileMetadata;
SourceFilePathParser::ParseResyncFileName(parsedFileName, resyncFileMetadata);
SourceResyncTelemetryRowPtr sourceResyncRow =
m_hostTelemetryMap.GetValidSourceResyncTelemetry(parsedHostId, parsedDevice);
if (!sourceResyncRow)
{
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_Alloc);
return;
}
if (!sourceResyncRow->AddToTelemetry(reqTelData.m_requestType, reqTelData, resyncFileMetadata))
{
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_AddToResyncTel);
return;
}
return; // Successfully added the telemetry
}
case FileType_Log:
case FileType_Telemetry:
case FileType_ChurStat:
case FileType_ThrpStat:
case FileType_TstData:
case FileType_Unknown:
case FileType_Invalid:
case FileType_InternalErr:
dataLevel = ReqTelLevel_Session; // Fallback to session level logging for these types
break;
default:
BOOST_ASSERT(false);
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_AddTelUnknFType);
return;
}
}
}
if (dataLevel == ReqTelLevel_Session)
{
// Prefer parsedHostId, if empty, use the host id provided at login.
const std::string &sessionHostId = !parsedHostId.empty() ? parsedHostId : reqTelData.m_hostId;
if (sessionHostId.empty())
{
BOOST_ASSERT(false);
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_SrcGenEmptyMeta);
return;
}
SourceGeneralTelemetryRowPtr sourceGenPtr =
m_hostTelemetryMap.GetValidSourceGeneralTelemetry(sessionHostId);
if (!sourceGenPtr)
{
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_Alloc);
return;
}
if (!sourceGenPtr->AddToTelemetry(retrievedFileType, reqTelData.m_requestType, reqTelData))
{
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_AddToSrcGenTel);
return;
}
return; // Successfully added the telemetry
}
if (dataLevel == ReqTelLevel_Server)
{
// Currently, all server level telemetry can come only on failures (like, not logged in).
BOOST_ASSERT(!reqTelData.m_hasRespondedSuccess && reqTelData.m_requestFailure != RequestFailure_Success);
GlobalCxpsTelemetryRowPtr globTelPtr = GetValidGlobalTelemetry();
if (!globTelPtr)
{
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_Alloc);
return;
}
if (!globTelPtr->AddToTelemetry(reqTelData))
{
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_AddToGlobalTel);
return;
}
return; // Successfully added the telemetry
}
BOOST_ASSERT(false); // Shouldn't reach here!
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_AddTelReachedEof);
return;
}
GENERIC_CATCH_LOG_ACTION("AddRequestDataToTelemetryLogger",
GLOBAL_TEL_REPORT_ERR_N_LASPSE_SEQ_NUM(ReportTelemetryFailure, TelFailure_AddToTelUnknErr));
}
}