Reporting.h (356 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/AbortChecker.h>
#include <wdt/ErrorCodes.h>
#include <wdt/WdtOptions.h>
#include <wdt/WdtTransferRequest.h>
#include <wdt/util/EncryptionUtils.h>
#include <algorithm>
#include <chrono>
#include <iterator>
#include <limits>
#include <string>
#include <unordered_map>
#include <vector>
#include <folly/synchronization/RWSpinLock.h>
namespace facebook {
namespace wdt {
const double kMbToB = 1024 * 1024;
const double kMicroToMilli = 1000;
const double kMicroToSec = 1000 * 1000;
const double kMilliToSec = 1000;
typedef std::chrono::high_resolution_clock Clock;
template <typename T>
int64_t durationMicros(T d) {
return std::chrono::duration_cast<std::chrono::microseconds>(d).count();
}
template <typename T>
int durationMillis(T d) {
return std::chrono::duration_cast<std::chrono::milliseconds>(d).count();
}
template <typename T>
double durationSeconds(T d) {
return std::chrono::duration_cast<std::chrono::duration<double>>(d).count();
}
template <typename T>
std::ostream &operator<<(std::ostream &os, const std::vector<T> &v) {
std::copy(v.begin(), v.end(), std::ostream_iterator<T>(os, " "));
return os;
}
// TODO rename to ThreadResult
/// class representing statistics related to file transfer
class TransferStats {
private:
/// number of header bytes transferred
int64_t headerBytes_ = 0;
/// number of data bytes transferred
int64_t dataBytes_ = 0;
/// number of header bytes transferred as part of successful file transfer
int64_t effectiveHeaderBytes_ = 0;
/// number of data bytes transferred as part of successful file transfer
int64_t effectiveDataBytes_ = 0;
/// number of files successfully transferred
int64_t numFiles_ = 0;
/// number of blocks successfully transferred
int64_t numBlocks_ = 0;
/// number of failed transfers
int64_t failedAttempts_ = 0;
/// Total number of blocks sent by sender
int64_t numBlocksSend_{-1};
/// Total number of bytes sent by sender
int64_t totalSenderBytes_{-1};
/// status of the transfer
ErrorCode localErrCode_ = OK;
/// status of the remote
ErrorCode remoteErrCode_ = OK;
/// id of the owner object
std::string id_;
/// encryption type used
EncryptionType encryptionType_{ENC_NONE};
/// is tls enabled?
bool tls_{false};
/// mutex to support synchronized access
std::unique_ptr<folly::RWSpinLock> mutex_{nullptr};
public:
// making the object noncopyable
TransferStats(const TransferStats &stats) = delete;
TransferStats &operator=(const TransferStats &stats) = delete;
TransferStats(TransferStats &&stats) = default;
TransferStats &operator=(TransferStats &&stats) = default;
explicit TransferStats(bool isLocked = false) {
if (isLocked) {
mutex_ = std::make_unique<folly::RWSpinLock>();
}
}
explicit TransferStats(const std::string &id, bool isLocked = false)
: TransferStats(isLocked) {
id_ = id;
}
void reset() {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
headerBytes_ = dataBytes_ = 0;
effectiveHeaderBytes_ = effectiveDataBytes_ = 0;
numFiles_ = numBlocks_ = 0;
failedAttempts_ = 0;
localErrCode_ = remoteErrCode_ = OK;
}
/// @return the number of blocks sent by sender
int64_t getNumBlocksSend() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return numBlocksSend_;
}
/// @return the total sender bytes
int64_t getTotalSenderBytes() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return totalSenderBytes_;
}
/// @return number of header bytes transferred
int64_t getHeaderBytes() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return headerBytes_;
}
/// @return number of data bytes transferred
int64_t getDataBytes() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return dataBytes_;
}
/**
* @param needLocking specifies whether we need to lock or not. this is
* for performance improvement. in sender, we do not
* need locking for this call, even though the other
* calls have to be locked
*
* @return number of total bytes transferred
*/
int64_t getTotalBytes(bool needLocking = true) const {
if (needLocking) {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return headerBytes_ + dataBytes_;
}
return headerBytes_ + dataBytes_;
}
/**
* @return number of header bytes transferred as part of successful file
* transfer
*/
int64_t getEffectiveHeaderBytes() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return effectiveHeaderBytes_;
}
/**
* @return number of data bytes transferred as part of successful file
* transfer
*/
int64_t getEffectiveDataBytes() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return effectiveDataBytes_;
}
/**
* @return number of total bytes transferred as part of successful file
* transfer
*/
int64_t getEffectiveTotalBytes() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return effectiveHeaderBytes_ + effectiveDataBytes_;
}
/// @return number of files successfully transferred
int64_t getNumFiles() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return numFiles_;
}
/// @return number of blocks successfully transferred
int64_t getNumBlocks() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return numBlocks_;
}
/// @return number of failed transfers
int64_t getFailedAttempts() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return failedAttempts_;
}
/// @return error code based on combinator of local and remote error
ErrorCode getErrorCode() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return getMoreInterestingError(localErrCode_, remoteErrCode_);
}
/// @return status of the transfer on this side
ErrorCode getLocalErrorCode() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return localErrCode_;
}
/// @return status of the transfer on the remote end
ErrorCode getRemoteErrorCode() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return remoteErrCode_;
}
const std::string &getId() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return id_;
}
/// @param number of additional data bytes transferred
void addDataBytes(int64_t count) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
dataBytes_ += count;
}
/// @param number of additional header bytes transferred
void addHeaderBytes(int64_t count) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
headerBytes_ += count;
}
/// @param set num blocks send
void setNumBlocksSend(int64_t numBlocksSend) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
numBlocksSend_ = numBlocksSend;
}
/// @param set total sender bytes
void setTotalSenderBytes(int64_t totalSenderBytes) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
totalSenderBytes_ = totalSenderBytes;
}
/// one more file transfer failed
void incrFailedAttempts() {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
failedAttempts_++;
}
/// @param status of the transfer
void setLocalErrorCode(ErrorCode errCode) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
localErrCode_ = errCode;
}
/// @param status of the transfer on the remote end
void setRemoteErrorCode(ErrorCode remoteErrCode) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
remoteErrCode_ = remoteErrCode;
}
/// @param id of the corresponding entity
void setId(const std::string &id) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
id_ = id;
}
/// @param numFiles number of files successfully send
void setNumFiles(int64_t numFiles) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
numFiles_ = numFiles;
}
/// one more block successfully transferred
void incrNumBlocks() {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
numBlocks_++;
}
void decrNumBlocks() {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
numBlocks_--;
}
/**
* @param headerBytes header bytes transfered part of a successful file
* transfer
* @param dataBytes data bytes transferred part of a successful file
* transfer
*/
void addEffectiveBytes(int64_t headerBytes, int64_t dataBytes) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
effectiveHeaderBytes_ += headerBytes;
effectiveDataBytes_ += dataBytes;
}
void subtractEffectiveBytes(int64_t headerBytes, int64_t dataBytes) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
effectiveHeaderBytes_ -= headerBytes;
effectiveDataBytes_ -= dataBytes;
}
void setEncryptionType(EncryptionType encryptionType) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
encryptionType_ = encryptionType;
}
EncryptionType getEncryptionType() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return encryptionType_;
}
void setTls(bool tls) {
folly::RWSpinLock::WriteHolder lock(mutex_.get());
tls_ = tls;
}
bool getTls() const {
folly::RWSpinLock::ReadHolder lock(mutex_.get());
return tls_;
}
TransferStats &operator+=(const TransferStats &stats);
friend std::ostream &operator<<(std::ostream &os, const TransferStats &stats);
};
/**
* Class representing entire client transfer report.
* Unit are mebibyte (MiB), ie 1048576 bytes which we call "Mbytes"
* for familiarity
*/
class TransferReport {
public:
// TODO: too many constructor parameters, needs to clean-up
/**
* This constructor moves all the stat objects to member variables. This is
* only called at the end of transfer by the sender
*/
TransferReport(std::vector<TransferStats> &transferredSourceStats,
std::vector<TransferStats> &failedSourceStats,
std::vector<TransferStats> &threadStats,
std::vector<std::string> &failedDirectories, double totalTime,
int64_t totalFileSize, int64_t numDiscoveredFiles,
int64_t previouslySentBytes, bool fileDiscoveryFinished);
/**
* This function does not move the thread stats passed to it. This is called
* by the progress reporter thread.
*/
TransferReport(const std::vector<TransferStats> &threadStats,
double totalTime, int64_t totalFileSize,
int64_t numDiscoveredFiles, bool fileDiscoveryFinished);
TransferReport(TransferStats &&stats, double totalTime, int64_t totalFileSize,
int64_t numDiscoveredFiles, bool fileDiscoveryFinished);
/// constructor used by receiver, does move the stats
explicit TransferReport(TransferStats &&globalStats);
/// @return summary of the report
const TransferStats &getSummary() const {
return summary_;
}
/// @return transfer throughput in Mbytes/sec
double getThroughputMBps() const {
return summary_.getEffectiveTotalBytes() / totalTime_ / kMbToB;
}
/// @return total time taken in transfer
double getTotalTime() const {
return totalTime_;
}
/// @return stats for successfully transferred sources
const std::vector<TransferStats> &getTransferredSourceStats() const {
return transferredSourceStats_;
}
/// @return stats for failed sources
const std::vector<TransferStats> &getFailedSourceStats() const {
return failedSourceStats_;
}
/// @return stats for threads
const std::vector<TransferStats> &getThreadStats() const {
return threadStats_;
}
const std::vector<std::string> &getFailedDirectories() const {
return failedDirectories_;
}
int64_t getTotalFileSize() const {
return totalFileSize_;
}
/// @return recent throughput in Mbytes/sec
double getCurrentThroughputMBps() const {
return currentThroughput_ / kMbToB;
}
/// @param stats stats to added
void addTransferStats(const TransferStats &stats) {
summary_ += stats;
}
/// @param currentThroughput current throughput
void setCurrentThroughput(double currentThroughput) {
currentThroughput_ = currentThroughput;
}
void setTotalTime(double totalTime) {
totalTime_ = totalTime;
}
void setTotalFileSize(int64_t totalFileSize) {
totalFileSize_ = totalFileSize;
}
void setErrorCode(const ErrorCode errCode) {
summary_.setLocalErrorCode(errCode);
summary_.setRemoteErrorCode(errCode);
}
int64_t getNumDiscoveredFiles() const {
return numDiscoveredFiles_;
}
bool fileDiscoveryFinished() const {
return fileDiscoveryFinished_;
}
int64_t getPreviouslySentBytes() const {
return previouslySentBytes_;
}
friend std::ostream &operator<<(std::ostream &os,
const TransferReport &report);
private:
TransferStats summary_;
/// stats for successfully transferred sources
std::vector<TransferStats> transferredSourceStats_;
/// stats for failed sources
std::vector<TransferStats> failedSourceStats_;
/// stats for client threads
std::vector<TransferStats> threadStats_;
/// directories which could not be opened
std::vector<std::string> failedDirectories_;
/// total transfer time
double totalTime_{0};
/// sum of all the file sizes
int64_t totalFileSize_{0};
/// recent throughput in bytes/sec
double currentThroughput_{0};
/// Count of all files discovered so far
int64_t numDiscoveredFiles_{0};
/// Number of bytes sent in previous transfers
int64_t previouslySentBytes_{0};
/// Is file discovery finished?
bool fileDiscoveryFinished_{false};
};
/**
* This class represents interface and default implementation of progress
* reporting
*/
class ProgressReporter {
public:
explicit ProgressReporter(const WdtTransferRequest &transferRequest)
: transferRequest_(transferRequest) {
isTty_ = isatty(STDOUT_FILENO);
}
/// this method is called before the transfer starts
virtual void start() {
}
/**
* This method gets called repeatedly with interval defined by
* progress_report_interval. If stdout is a terminal, then it displays
* transfer progress in stdout. Example output [===> ] 30% 5.00 Mbytes/sec.
* Else, it prints progress details in stdout.
*
* @param report current transfer report
*/
virtual void progress(const std::unique_ptr<TransferReport> &report);
/**
* This method gets called after the transfer ends
*
* @param report final transfer report
*/
virtual void end(const std::unique_ptr<TransferReport> &report);
virtual ~ProgressReporter() {
}
protected:
/// Reference to the wdt transfer request for the wdt base
/// object using the progress reporter
const WdtTransferRequest &transferRequest_;
private:
/**
* Displays progress of the transfer in stdout
*
* @param progress progress percentage
* @param throughput average throughput
* @param currentThroughput recent throughput
* @param numDiscoveredFiles number of files discovered so far
* @param fileDiscoveryFinished true once file discovery has compeleted
*/
void displayProgress(int progress, double averageThroughput,
double currentThroughput, int64_t numDiscoveredFiles,
bool fileDiscoveryFinished);
/**
* logs progress details
*
* @param effectiveDataBytes number of bytes sent
* @param progress progress percentage
* @param throughput average throughput
* @param currentThroughput recent throughput
* @param numDiscoveredFiles number of files discovered so far
* @param fileDiscoveryFinished true once file discovery has compeleted
*/
void logProgress(int64_t effectiveDataBytes, int progress,
double averageThroughput, double currentThroughput,
int64_t numDiscoveredFiles, bool fileDiscoveryFinished);
/// whether stdout is redirected to a terminal or not
bool isTty_;
};
/// class representing perf stat collection
class PerfStatReport {
public:
enum StatType {
SOCKET_READ,
SOCKET_WRITE,
FILE_OPEN,
FILE_CLOSE,
FILE_READ,
FILE_WRITE,
SYNC_FILE_RANGE,
FSYNC_STATS, // just 'FSYNC' is defined on Windows/conflicts
FILE_SEEK,
THROTTLER_SLEEP,
RECEIVER_WAIT_SLEEP, // receiver sleep duration between sending wait cmd to
// sender. A high sum for this suggests threads
// were not properly load balanced
DIRECTORY_CREATE,
IOCTL,
UNLINK,
FADVISE,
END
};
explicit PerfStatReport(const WdtOptions &options);
/**
* @param statType stat-type
* @param timeInMicros time taken by the operation in microseconds
*/
void addPerfStat(StatType statType, int64_t timeInMicros);
friend std::ostream &operator<<(std::ostream &os,
const PerfStatReport &statReport);
PerfStatReport &operator+=(const PerfStatReport &statReport);
private:
const static int kNumTypes_ = PerfStatReport::END;
const static std::string statTypeDescription_[];
const static int32_t kHistogramBuckets[];
/// mapping from time to number of entries
std::unordered_map<int64_t, int64_t> perfStats_[kNumTypes_];
/// max time for different stat types
int64_t maxValueMicros_[kNumTypes_] = {0};
/// min time for different stat types
int64_t minValueMicros_[kNumTypes_] = {std::numeric_limits<int64_t>::max()};
/// number of records for different stat types
int64_t count_[kNumTypes_] = {0};
/// sum of all records for different stat types
int64_t sumMicros_[kNumTypes_] = {0};
/// network timeout in milliseconds
int networkTimeoutMillis_;
/// mutex to support synchronized access
mutable folly::RWSpinLock mutex_;
};
}
}