host/cxpslib/Telemetry/RequestTelemetryData.h (432 lines of code) (raw):
#ifndef CXPS_REQUEST_TELEMETRY_DATA_H
#define CXPS_REQUEST_TELEMETRY_DATA_H
#include <string>
#include <boost/atomic.hpp>
#include "CxpsPerfCounters.h"
namespace CxpsTelemetry
{
// Forward declarations
class CxpsTelemetryLogger;
class FileTelemetryData;
class CxpsTelemetryRowBase;
class RequestTelemetryData;
void AddRequestDataToTelemetryLogger(const RequestTelemetryData&);
class RequestTelemetryData
{
// TODO-SanKumar-1710: Make only the directly communicating member functions as a friend.
friend class CxpsTelemetryLogger;
friend class FileTelemetryData;
friend class CxpsTelemetryRowBase;
private:
enum RequestOperation
{
RequestOp_None = 0,
RequestOp_NwReadReqBlock,
RequestOp_NwRead,
RequestOp_NwReadDummyFromBuf,
RequestOp_NwWrite,
RequestOp_NwWriteDummyFromBuf,
RequestOp_FRead,
RequestOp_FWrite,
RequestOp_FFlush,
RequestOp_Compress,
RequestOp_ReqSpecificOp
};
std::string m_sessionId;
bool m_isUsingSsl;
RequestTelemetryDataLevel m_dataLevel;
// TODO-SanKumar-1711: Make it atomic and compare and update only if not already set.
// Or should we go with flags?
RequestTelemetryDataError m_dataError;
RequestType m_requestType;
RequestOperation m_currOperation;
bool m_isDummyRW, m_isNewRequestBlockingRead;
std::string m_hostId;
std::string m_filePath;
bool m_hasRespondedSuccess;
bool m_isPutFileMoreData;
boost::atomic<RequestFailure> m_requestFailure;
boost_pt::ptime m_requestStartTime;
boost_pt::ptime m_requestEndTime;
SteadyClock::time_point m_requestStartTimePoint;
SteadyClock::time_point m_requestEndTimePoint;
int64_t m_numOfRequests;
SteadyClock::time_point m_durationCalculationStart;
chrono::nanoseconds m_newRequestBlockTime;
chrono::nanoseconds m_totalReqSpecificOpTime;
CxpsPerfCounters m_perfCounters;
// returns throughput - time taken(mill seconds) to transfer 1 MB of data
static double GetNormalizedThroughput(int64_t bytes, chrono::nanoseconds totalTimeTaken)
{
const int32_t _256KB = 256 * 1024;
const int32_t _1MB = 1 * 1024 * 1024;
double _256KBAlignedSizeBytes = (double)(bytes / _256KB + (bytes%_256KB == 0 ? 0 : 1)) * _256KB;
double normalizedSizeInMB = _256KBAlignedSizeBytes / _1MB;
double timeTakenInMilliSecs = (double)totalTimeTaken.count() / (boost::nano::den / boost::milli::den);
return timeTakenInMilliSecs / normalizedSizeInMB; // Time(ms) taken per 1 MB
}
void StartInternalTimer(RequestOperation operation)
{
if (m_currOperation != RequestOp_None)
{
BOOST_ASSERT(false);
if (m_dataError == ReqTelErr_None)
m_dataError = ReqTelErr_AnotherOpInProgress;
m_currOperation = RequestOp_None; // reset
}
if (m_durationCalculationStart != Defaults::s_defaultTimePoint)
{
BOOST_ASSERT(false);
if (m_dataError == ReqTelErr_None)
m_dataError = ReqTelErr_TimerAlreadyStarted;
m_durationCalculationStart = Defaults::s_defaultTimePoint; // reset
}
m_currOperation = operation;
m_durationCalculationStart = SteadyClock::now();
}
chrono::nanoseconds EndInternalTimer(RequestOperation operation)
{
SteadyClock::time_point startTime = m_durationCalculationStart;
m_durationCalculationStart = Defaults::s_defaultTimePoint; // reset
if (m_currOperation != operation)
{
BOOST_ASSERT(false);
if (m_dataError == ReqTelErr_None)
m_dataError = ReqTelErr_UnexpectedOpInProgress;
startTime = Defaults::s_defaultTimePoint;
}
m_currOperation = RequestOp_None; // reset
if (startTime == Defaults::s_defaultTimePoint)
{
BOOST_ASSERT(false);
if (m_dataError == ReqTelErr_None)
m_dataError = ReqTelErr_TimerNotStarted;
return Defaults::s_zeroTime;
}
return SteadyClock::now() - startTime;
}
void RequestCompleted(bool internalSessionLogout)
{
BOOST_ASSERT(internalSessionLogout || // Any failure ended up logging out the session.
m_isPutFileMoreData || // put file could've more data and so not completed.
m_hasRespondedSuccess || // otherwise, we should've reached here only after server responded success to client.
m_requestFailure == RequestFailure_ErrorInResponse); // unless there was an error in sending the response.
if (m_requestFailure == RequestFailure_Success &&
m_currOperation != RequestTelemetryData::RequestOp_None)
{
BOOST_ASSERT(false);
m_dataError = ReqTelErr_DanglingCurrOp;
}
// Unless there has been a "successful" put request with "more data = true",
// flush the collected data into telemetry.
if (!(m_isPutFileMoreData && m_requestFailure == RequestFailure_Success))
{
if (m_dataLevel > ReqTelLevel_Server && m_numOfRequests <= 0)
{
// The request count is started at the beginning of a request
// read and so there shouldn't be a case, where it's 0.
// TODO-SanKumar-1711: There's a known case, where this issue
// could arise. Currently, if the timeout thread runs in parallel
// with the processing/completion of the request, it would
// result in double posting of the request telemetry data,
// while the second thread posting it would be uploading an
// empty(just cleared) data. This should go away once the timeout
// thread is synchronized with the server thread of the session.
BOOST_ASSERT(false);
m_dataError = ReqTelErr_NumReqsLE0;
}
m_requestEndTimePoint = SteadyClock::now();
m_requestEndTime = boost_pt::microsec_clock::universal_time();
AddRequestDataToTelemetryLogger(*this);
Clear();
}
// else
// m_isPutFileMoreData == true.
// Don't clear any member, since the file put will be continued with subsequent requests.
}
public:
RequestTelemetryData()
: m_dataLevel(ReqTelLevel_Server)
{
BOOST_STATIC_ASSERT(Tunables::FWriteLatencyBucketsCount >= 2); // At least one bounded and one unbounded bucket
BOOST_ASSERT(Tunables::FWriteLatencyBuckets_ms[0] == 0);
BOOST_STATIC_ASSERT(Tunables::NwReadLatencyBucketsCount >= 2); // At least one bounded and one unbounded bucket
BOOST_ASSERT(Tunables::NwReadLatencyBuckets_ms[0] == 0);
// TODO-SanKumar-1711: Write asserts to ensure the buckets are in increasing order
m_hostId.reserve(37);
//m_filePath.reserve(256 * sizeof(wchar_t));
Clear();
}
~RequestTelemetryData()
{
BOOST_ASSERT(IsEmpty());
}
void SetSessionProperties(const std::string &sessionId, bool isUsingSsl)
{
try
{
m_isUsingSsl = isUsingSsl;
m_sessionId = sessionId;
}
GENERIC_CATCH_LOG_IGNORE("SetSessionProperties");
}
// Timer methods
void StartingOp()
{
StartInternalTimer(RequestOp_ReqSpecificOp);
}
void CompletingOp()
{
m_totalReqSpecificOpTime += EndInternalTimer(RequestOp_ReqSpecificOp);
}
void StartingNwReadForNewRequest()
{
BOOST_ASSERT(m_isPutFileMoreData || IsEmpty());
BOOST_ASSERT(!m_isDummyRW && !m_isNewRequestBlockingRead);
// Although a request isn't received until the corresponding
// read completion is received, we can't increment this counter at
// the completion. Because the accounting wouldn't tally, if
// there's a failure at this read. (i.e. it will be incorrect as
// num of req = 0, failed req = 1). So ++ing before receiving request.
++m_numOfRequests;
m_isNewRequestBlockingRead = true;
// StartingNwRead() will be called immediately after this.
}
void StartingDummyNwReadFromBuffer(int64_t expectedBytesRead)
{
BOOST_ASSERT(!m_isDummyRW && !m_isNewRequestBlockingRead);
// TODO-SanKumar-1711: Assert the expected size at the completion service as double check.
m_isDummyRW = true;
StartingNwRead();
}
void StartingNwRead()
{
BOOST_ASSERT(!(m_isDummyRW && m_isNewRequestBlockingRead));
BOOST_ASSERT(m_numOfRequests > 0);
RequestOperation typeOfNwRead;
if (m_isNewRequestBlockingRead)
typeOfNwRead = RequestOp_NwReadReqBlock;
else if (m_isDummyRW)
typeOfNwRead = RequestOp_NwReadDummyFromBuf;
else
typeOfNwRead = RequestOp_NwRead;
if (typeOfNwRead != RequestOp_NwReadDummyFromBuf)
++m_perfCounters.m_totalNwReadCnt;
StartInternalTimer(typeOfNwRead);
}
void CompletingNwRead(int64_t bytesRead)
{
BOOST_ASSERT(!(m_isDummyRW && m_isNewRequestBlockingRead));
BOOST_ASSERT(m_numOfRequests > 0 && m_perfCounters.m_totalNwReadCnt > 0);
RequestOperation typeOfNwRead;
if (m_isNewRequestBlockingRead)
typeOfNwRead = RequestOp_NwReadReqBlock;
else if (m_isDummyRW)
typeOfNwRead = RequestOp_NwReadDummyFromBuf;
else
typeOfNwRead = RequestOp_NwRead;
m_isNewRequestBlockingRead = false; // reset
m_isDummyRW = false; // reset
chrono::nanoseconds timeTaken = EndInternalTimer(typeOfNwRead);
// RequestOp_NwReadDummyFromBuf doesn't have to update any stat, since
// a NwRead operation before would've accounted for it.
if (typeOfNwRead == RequestOp_NwReadDummyFromBuf)
return;
m_perfCounters.m_totalNwReadBytes += bytesRead;
++m_perfCounters.m_succNwReadCnt;
if (typeOfNwRead == RequestOp_NwRead)
{
m_perfCounters.m_totalNwReadTime += NANO_TO_TICKS(timeTaken);
Utils::IncrementBucket(
Tunables::NwReadLatencyBuckets_ms,
GetNormalizedThroughput(bytesRead, timeTaken),
m_perfCounters.m_nwReadBuckets,
Tunables::NwReadLatencyBucketsCount);
}
else // typeOfNwRead == RequestOp_NwReadReqBlock
{
if (!m_isPutFileMoreData)
{
m_requestStartTime = boost_pt::microsec_clock::universal_time();
m_requestStartTimePoint = SteadyClock::now();
m_newRequestBlockTime = timeTaken;
}
else
{
m_perfCounters.m_totalPutReqInterRequestTime += NANO_TO_TICKS(timeTaken);
}
}
}
void ReceivedNwEof()
{
// On receiving EOF, consider the read as NOOP. Most likely, this would
// end in a protocol error.
BOOST_ASSERT(m_currOperation == RequestOp_NwRead || m_currOperation == RequestOp_NwReadReqBlock);
// TODO-SanKumar-1710: We are considering this as a network read cnt++.
// The client has the right to send EOF anywhere within the timeout, so should be fine.
CompletingNwRead(0);
}
void StartingDummyNwWriteFromBuffer(int64_t expectedBytesWritten)
{
BOOST_ASSERT(!m_isDummyRW && !m_isNewRequestBlockingRead);
// TODO-SanKumar-1711: Assert the expected size at the completion service as double check.
m_isDummyRW = true;
StartingNwWrite();
}
void StartingNwWrite()
{
BOOST_ASSERT(!m_isNewRequestBlockingRead);
RequestOperation typeOfNwWrite = m_isDummyRW ? RequestOp_NwWriteDummyFromBuf : RequestOp_NwWrite;
if (typeOfNwWrite != RequestOp_NwWriteDummyFromBuf)
++m_perfCounters.m_totalNwWriteCnt;
StartInternalTimer(typeOfNwWrite);
}
void CompletingNwWrite(int64_t bytesWritten)
{
BOOST_ASSERT(!m_isNewRequestBlockingRead);
RequestOperation typeOfNwWrite = m_isDummyRW ? RequestOp_NwWriteDummyFromBuf : RequestOp_NwWrite;
m_isDummyRW = false;
chrono::nanoseconds timeTaken = EndInternalTimer(typeOfNwWrite);
// RequestOp_NwWriteDummyFromBuf doesn't have to update any stat, since
// a NwWrite operation before would've accounted for it.
if (typeOfNwWrite == RequestOp_NwWriteDummyFromBuf)
return;
// typeOfNwWrite == RequestOp_NwWrite
m_perfCounters.m_totalNwWriteTime += NANO_TO_TICKS(timeTaken);
m_perfCounters.m_totalNwWrittenBytes += bytesWritten;
++m_perfCounters.m_succNwWriteCnt;
}
void StartingFileWrite()
{
++m_perfCounters.m_totalFileWriteCnt;
StartInternalTimer(RequestOp_FWrite);
}
void CompletingFileWrite(int64_t bytesWritten)
{
chrono::nanoseconds timeTaken = EndInternalTimer(RequestOp_FWrite);
m_perfCounters.m_totalFileWriteTime += NANO_TO_TICKS(timeTaken);
m_perfCounters.m_totalFileWrittenBytes += bytesWritten;
Utils::IncrementBucket(
Tunables::FWriteLatencyBuckets_ms,
GetNormalizedThroughput(bytesWritten, timeTaken),
m_perfCounters.m_fWriteBuckets,
Tunables::FWriteLatencyBucketsCount);
++m_perfCounters.m_succFileWriteCnt;
}
void StartingFileRead()
{
++m_perfCounters.m_totalFileReadCnt;
StartInternalTimer(RequestOp_FRead);
}
void CompletingFileRead(int64_t bytesRead)
{
m_perfCounters.m_totalFileReadTime += NANO_TO_TICKS(EndInternalTimer(RequestOp_FRead));
m_perfCounters.m_totalFileReadBytes += bytesRead;
++m_perfCounters.m_succFileReadCnt;
}
void StartingFileCompression()
{
StartInternalTimer(RequestOp_Compress);
}
void CompletingFileCompression()
{
m_perfCounters.m_totalFileCompressTime += NANO_TO_TICKS(EndInternalTimer(RequestOp_Compress));
}
void StartingFileFlush()
{
StartInternalTimer(RequestOp_FFlush);
}
void CompleteingFileFlush()
{
m_perfCounters.m_totalFileFlushTime += NANO_TO_TICKS(EndInternalTimer(RequestOp_FFlush));
}
// Getter methods
bool HasRespondedSuccess()
{
return m_hasRespondedSuccess;
}
RequestFailure GetRequestFailure() const
{
return m_requestFailure.load(boost::memory_order_relaxed);
}
// Setter methods
void MarkPutFileMoreData(bool moreData)
{
m_isPutFileMoreData = moreData;
}
void SuccessfullyResponded()
{
m_hasRespondedSuccess = true;
}
void SetRequestFailure(RequestFailure requestFailure)
{
// Update the failure, iff no error was already set. This is helpful
// in ensuring the closest exception/error handler pops a specific
// failure than the outer handler giving generic error.
// This way original cause of the failure is preserved. Other examples
// are request also timing out, response failing to send error, etc.
RequestFailure allowedPrevFailure = RequestFailure_Success;
// Ensuring atomicity of this exchange instead of using the costly
// lock over all the members of this class. In a session, all the
// other members are updated serially, but during timeout this is the
// only member that could be set in parallel.
if (m_requestFailure.compare_exchange_strong(allowedPrevFailure, requestFailure, boost::memory_order_seq_cst))
{
// If the error is set, while one of the following ops is in
// progress, then increment the corresponding failed count.
switch (m_currOperation)
{
case RequestOp_NwReadReqBlock:
case RequestOp_NwRead:
++m_perfCounters.m_failedNwReadCnt;
break;
case RequestOp_NwWrite:
++m_perfCounters.m_failedNwWriteCnt;
break;
case RequestOp_FRead:
++m_perfCounters.m_failedFileReadCnt;
break;
case RequestOp_FWrite:
++m_perfCounters.m_failedFileWriteCnt;
break;
default: //NOOP
break;
}
// TODO-SanKumar-1711: Should we reset m_currOperation here?
// Could that cause a telemetry data error wrongly?
}
}
void RequestCompleted()
{
RequestCompleted(false); // Signalled externally (only on successful completion/list file not found error).
}
void AcquiredRequestType(RequestType requestType)
{
// TODO-SanKumar-1710: Should we also take the request id at this point
// and assert/runtime-check that all the data updates are coming for
// the same request id? Overkill?
m_requestType = requestType;
}
// Level movers
void AcquiredHostId(const std::string& hostId)
{
// Before subsequent request handling, the host id of the session is
// repeatedly retried, making any failure in assigning at login is transient.
BOOST_ASSERT(m_hostId.empty() || m_hostId == hostId);
try
{
// This is attempted at the start of every request handling, so
// averting updates in case of redundant retry.
if (!hostId.empty() && m_hostId.empty())
{
m_hostId = hostId;
m_dataLevel = ReqTelLevel_Session;
}
}
GENERIC_CATCH_LOG_ACTION("AcquiredHostId", m_hostId.clear());
}
void AcquiredFilePath(const std::string& filePath)
{
try
{
m_filePath = filePath;
m_dataLevel = ReqTelLevel_File;
}
GENERIC_CATCH_LOG_ACTION("AcquiredFilePath", m_filePath.clear());
}
void SessionLoggingOut()
{
// TODO-SanKumar-1710: If the error is PutFileInProgress, then the
// request telemetry data object contains the merged statistics of
// the prev put requests and the current request. But when we log an
// error, the error would always correspond to the current request.
// There should be another put telemetry failure along with its stats addded.
// TODO-SanKumar-1710: Should we track number of sessions as well? (created, active, destroyed)
// m_hasRespondedSuccess == true, in case of successful logout.
if (!m_hasRespondedSuccess && GetRequestFailure() == RequestFailure_Success)
{
// The session is logging out due to an error, but telemetry logic
// hasn't captured the specific error.
SetRequestFailure(RequestFailure_UnknownError);
}
RequestCompleted(true);
}
void Clear()
{
// m_sessionId, m_isUsingSsl remain the same for the life time of the session. Don't reset.
m_dataError = ReqTelErr_None;
m_requestType = RequestType_Invalid;
m_currOperation = RequestOp_None;
m_isDummyRW = false;
m_isNewRequestBlockingRead = false;
// m_dataLevel, m_hostId remains through out the session
if (m_dataLevel == ReqTelLevel_File)
{
// The file path belonged to the particular request. Step back
// to receive the file path from the next request.
m_dataLevel = ReqTelLevel_Session;
}
m_filePath.clear();
m_hasRespondedSuccess = false;
m_isPutFileMoreData = false;
m_requestFailure = RequestFailure_Success; // 0
m_requestStartTime = Defaults::s_defaultPTime;
m_requestEndTime = Defaults::s_defaultPTime;
m_requestStartTimePoint = Defaults::s_defaultTimePoint;
m_requestEndTimePoint = Defaults::s_defaultTimePoint;
m_numOfRequests = 0;
m_durationCalculationStart = Defaults::s_defaultTimePoint;
m_newRequestBlockTime = Defaults::s_zeroTime;
m_totalReqSpecificOpTime = Defaults::s_zeroTime;
m_perfCounters.Clear();
}
bool IsEmpty()
{
return
m_dataError == ReqTelErr_None &&
m_requestType == RequestType_Invalid &&
m_currOperation == RequestOp_None &&
m_isDummyRW == false &&
m_isNewRequestBlockingRead == false &&
m_filePath.empty() &&
m_hasRespondedSuccess == false &&
m_isPutFileMoreData == false &&
m_requestFailure == RequestFailure_Success &&
m_requestStartTime == Defaults::s_defaultPTime &&
m_requestEndTime == Defaults::s_defaultPTime &&
m_requestStartTimePoint == Defaults::s_defaultTimePoint &&
m_requestEndTimePoint == Defaults::s_defaultTimePoint &&
m_numOfRequests == 0 &&
// TODO-SanKumar-1711: TBD
// memcmp(m_nwReadBuckets) == 0
// memcmp(m_fWriteBuckets) == 0
m_durationCalculationStart == Defaults::s_defaultTimePoint &&
m_newRequestBlockTime == Defaults::s_zeroTime &&
m_totalReqSpecificOpTime == Defaults::s_zeroTime &&
m_perfCounters.IsEmpty();
}
};
}
#endif // !CXPS_REQUEST_TELEMETRY_DATA_H