util/WdtSocket.cpp (734 lines of code) (raw):
#include <wdt/util/WdtSocket.h>
#include <folly/lang/Bits.h>
#include <folly/String.h> // for humanify
#include <netdb.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <wdt/Protocol.h>
#ifdef WDT_HAS_SOCKIOS_H
#include <linux/sockios.h>
#endif
namespace facebook {
namespace wdt {
WdtSocket::WdtSocket(ThreadCtx &threadCtx, const int port,
const EncryptionParams &encryptionParams,
const int64_t ivChangeInterval,
Func &&tagVerificationSuccessCallback)
: port_(port),
threadCtx_(threadCtx),
encryptionParams_(encryptionParams),
ivChangeInterval_(ivChangeInterval),
tagVerificationSuccessCallback_(
std::move(tagVerificationSuccessCallback)) {
if (encryptionTypeToTagLen(encryptionParams_.getType())) {
// encryption has tag verification support
writeTagInterval_ = threadCtx_.getOptions().encryption_tag_interval_bytes;
}
resetEncryptor();
resetDecryptor();
}
// TODO: consider refactoring this to return error code
void WdtSocket::readEncryptionSettingsOnce(int timeoutMs) {
if (!encryptionParams_.isSet() || encryptionSettingsRead_) {
return;
}
WDT_CHECK(!encryptionParams_.getSecret().empty());
int numRead = readInternal(buf_, 1, timeoutMs, true);
if (numRead != 1) {
WLOG(ERROR) << "Failed to read encryption settings " << numRead << " "
<< port_;
return;
}
if (buf_[0] != Protocol::ENCRYPTION_CMD) {
WLOG(ERROR) << "Expected to read ENCRYPTION_CMD(e), but got " << buf_[0];
readErrorCode_ = UNEXPECTED_CMD_ERROR;
return;
}
int toRead = Protocol::kEncryptionCmdLen - 1; // already read 1 byte for cmd
numRead = readInternal(buf_, toRead,
threadCtx_.getOptions().read_timeout_millis, true);
if (numRead != toRead) {
WLOG(ERROR) << "Failed to read encryption settings " << numRead << " "
<< toRead << " " << port_;
readErrorCode_ = SOCKET_READ_ERROR;
return;
}
int64_t off = 0;
EncryptionType encryptionType;
std::string iv;
if (!Protocol::decodeEncryptionSettings(
buf_, off, Protocol::kEncryptionCmdLen, encryptionType, iv,
readTagInterval_)) {
WLOG(ERROR) << "Failed to decode encryption settings";
readErrorCode_ = PROTOCOL_ERROR;
return;
}
if (encryptionType != encryptionParams_.getType()) {
WLOG(ERROR) << "Encryption type mismatch "
<< encryptionTypeToStr(encryptionType) << " "
<< encryptionTypeToStr(encryptionParams_.getType());
readErrorCode_ = PROTOCOL_ERROR;
return;
}
if (readTagInterval_ < 0) {
WLOG(ERROR) << "Encryption tag verification interval can't be negative "
<< readTagInterval_;
readErrorCode_ = PROTOCOL_ERROR;
return;
}
if ((readTagInterval_ > 0) && (encryptionTypeToTagLen(encryptionType) == 0)) {
WLOG(ERROR) << "Tag verification should not be enabled for "
<< encryptionTypeToStr(encryptionType) << " "
<< readTagInterval_;
readErrorCode_ = PROTOCOL_ERROR;
return;
}
if (!decryptor_->start(encryptionParams_, iv)) {
readErrorCode_ = ENCRYPTION_ERROR;
return;
}
WLOG(INFO) << "Successfully read encryption settings " << port_ << " "
<< encryptionTypeToStr(encryptionType);
encryptionSettingsRead_ = true;
}
void WdtSocket::writeEncryptionSettingsOnce() {
if (!encryptionParams_.isSet() || encryptionSettingsWritten_) {
return;
}
WDT_CHECK(!encryptionParams_.getSecret().empty());
int timeoutMs = threadCtx_.getOptions().write_timeout_millis;
std::string iv;
if (!encryptor_->start(encryptionParams_, iv)) {
writeErrorCode_ = ENCRYPTION_ERROR;
return;
}
int64_t off = 0;
buf_[off++] = Protocol::ENCRYPTION_CMD;
Protocol::encodeEncryptionSettings(
buf_, off, off + Protocol::kEncryptionCmdLen, encryptionParams_.getType(),
iv, writeTagInterval_);
int written = writeInternal(buf_, off, timeoutMs, false);
if (written != off) {
WLOG(ERROR) << "Failed to write encryption settings " << written << " "
<< port_;
return;
}
encryptionSettingsWritten_ = true;
}
int WdtSocket::readInternal(char *buf, int nbyte, int timeoutMs, bool tryFull) {
int numRead = readWithAbortCheck(buf, nbyte, timeoutMs, tryFull);
if (numRead == 0) {
readErrorCode_ = SOCKET_READ_ERROR;
return 0;
}
if (numRead < 0) {
if (errno == EAGAIN || errno == EINTR) {
readErrorCode_ = WDT_TIMEOUT;
} else {
readErrorCode_ = SOCKET_READ_ERROR;
}
return numRead;
}
// clear error code if successful
readErrorCode_ = OK;
return numRead;
}
int WdtSocket::writeInternal(const char *buf, int nbyte, int timeoutMs,
bool retry) {
int count = 0;
int written = 0;
while (written < nbyte) {
int w = writeWithAbortCheck(buf + written, nbyte - written, timeoutMs,
/* always try to write everything */ true);
if (w <= 0) {
break;
}
written += w;
count++;
if (!retry) {
break;
}
}
if (written != nbyte) {
WLOG(ERROR) << "Socket write failure " << written << " " << nbyte;
writeErrorCode_ = SOCKET_WRITE_ERROR;
return -1;
}
WLOG_IF(INFO, count > 1) << "Took " << count << " attempts to write " << nbyte
<< " bytes to socket";
return written;
}
bool WdtSocket::checkAndChangeDecryptionIv(const std::string &tag) {
if (ivChangeInterval_ == 0) {
return true;
}
if (decryptor_->getNumProcessed() + readTagInterval_ <= ivChangeInterval_) {
// can wait till next tag read
return true;
}
WLOG(INFO) << "Need to change decryption iv " << port_;
// read the new iv
std::string iv;
iv.resize(kAESBlockSize);
const int numRead =
readInternal(&(iv.front()), kAESBlockSize,
threadCtx_.getOptions().read_timeout_millis, true);
if (numRead != kAESBlockSize) {
WLOG(ERROR) << "Unable to read encryption iv " << numRead << " "
<< kAESBlockSize;
return false;
}
if (!decryptor_->finish(tag)) {
WLOG(ERROR) << "Failed to verify encryption tag";
return false;
}
resetDecryptor();
if (!decryptor_->start(encryptionParams_, iv)) {
return false;
}
return true;
}
std::string WdtSocket::readEncryptionTag() {
std::string tag;
const int toRead = encryptionTypeToTagLen(encryptionParams_.getType());
tag.resize(toRead);
int read = readInternal(&(tag.front()), tag.size(),
threadCtx_.getOptions().read_timeout_millis, true);
if (read != toRead) {
WLOG(ERROR) << "Unable to read tag, got " << read << " needed " << toRead
<< " " << folly::humanify(tag);
tag.clear();
// It is important to mark this as encryption error. Because, we treat
// WDT_TIMEOUT as retryable error. But, a failure to read the tag makes this
// socket unusable.
// If we want to make this failure retryable, then, we have to cache the
// part of the tag read which is complicated.
readErrorCode_ = ENCRYPTION_ERROR;
}
WVLOG(1) << "Encryption tag read " << folly::humanify(tag) << " totalRead_ "
<< totalRead_ << " tag interval " << readTagInterval_;
return tag;
}
int WdtSocket::computeNextTagOffset(int64_t totalProcessed,
int64_t tagInterval) {
if (totalProcessed == 0) {
return tagInterval;
}
int nextTagOffset = (tagInterval - (totalProcessed % tagInterval));
if (nextTagOffset == tagInterval) {
nextTagOffset = 0;
}
return nextTagOffset;
}
int WdtSocket::readAndDecrypt(char *buf, int nbyte, int timeoutMs,
bool tryFull) {
WDT_CHECK_GT(nbyte, 0);
const bool encrypt = encryptionParams_.isSet();
WDT_CHECK(encrypt);
// tag is transferred in plain text
int numRead = readInternal(buf, nbyte, timeoutMs, tryFull);
if (numRead <= 0) {
return numRead;
}
// have to decrypt data
if (!decryptor_->decrypt(buf, numRead, buf)) {
readErrorCode_ = ENCRYPTION_ERROR;
return -1;
}
return numRead;
}
int WdtSocket::readAndDecryptWithTag(char *buf, int nbyte, int timeoutMs,
bool tryFull) {
WDT_CHECK_GT(readTagInterval_, 0);
WDT_CHECK_LE(nbyte, readTagInterval_);
// first try to figure out whether this read will contain a tag
const int nextTagOffset = computeNextTagOffset(totalRead_, readTagInterval_);
int numRead = 0;
if (nextTagOffset < nbyte) {
// tag is contained in this read
if (nextTagOffset > 0) {
// try to read till the tag
const int ret = readAndDecrypt(buf, nextTagOffset, timeoutMs, tryFull);
if (ret <= 0) {
// read error
return ret;
}
totalRead_ += ret;
if (ret < nextTagOffset) {
// couldn't read till the tag. Tag will get read during next read
return ret;
}
WDT_CHECK_EQ(nextTagOffset, ret);
numRead = ret;
}
WDT_CHECK_EQ(0, totalRead_ % readTagInterval_);
// now read and verify tag
const std::string tag = readEncryptionTag();
if (tag.empty()) {
// readEncryptionTag already logs error
return -1;
}
if (!decryptor_->verifyTag(tag)) {
// verifyTag logs
readErrorCode_ = ENCRYPTION_ERROR;
return -1;
}
// tag verification successful, inform higher layer
if (tagVerificationSuccessCallback_ != nullptr) {
tagVerificationSuccessCallback_();
}
if (!checkAndChangeDecryptionIv(tag)) {
readErrorCode_ = ENCRYPTION_ERROR;
return -1;
}
}
// now try to read rest of the data
const int ret =
readAndDecrypt(buf + numRead, nbyte - numRead, timeoutMs, tryFull);
if (ret <= 0) {
// read error
return (numRead > 0 ? numRead : ret);
}
totalRead_ += ret;
return numRead + ret;
}
int WdtSocket::readWithTimeout(char *buf, int nbyte, int timeoutMs,
bool tryFull) {
WDT_CHECK_GT(nbyte, 0);
if (readErrorCode_ != OK && readErrorCode_ != WDT_TIMEOUT) {
WLOG(ERROR) << "Socket read failed before, not trying to read again "
<< port_;
return -1;
}
readErrorCode_ = OK;
int numRead = 0;
// first try to find encryption settings
readEncryptionSettingsOnce(timeoutMs);
if (supportUnencryptedPeer_ && readErrorCode_ == UNEXPECTED_CMD_ERROR) {
WLOG(WARNING)
<< "Turning off encryption since the other side does not support "
"encryption "
<< port_;
readErrorCode_ = OK;
buf[0] = buf_[0];
numRead = 1;
// also turn off encryption
encryptionParams_.erase();
} else if (readErrorCode_ != OK) {
return -1;
}
const bool encrypt = encryptionParams_.isSet();
if (!encrypt) {
// handle the non-encryption case
int ret = readInternal(buf + numRead, nbyte - numRead, timeoutMs, tryFull);
if (ret >= 0) {
return numRead + ret;
}
// read failure
return (numRead > 0 ? numRead : -1);
}
// handle encryption case
WDT_CHECK_EQ(0, numRead);
if (readTagInterval_ <= 0) {
return readAndDecrypt(buf, nbyte, timeoutMs, tryFull);
}
// tag verification enabled
// have to break the read in chunks of readTagInterval_
while (numRead < nbyte) {
const int remaining = nbyte - numRead;
const int toRead = std::min(remaining, readTagInterval_);
const int ret =
readAndDecryptWithTag(buf + numRead, toRead, timeoutMs, tryFull);
if (ret <= 0) {
// read error
return (numRead > 0 ? numRead : ret);
}
numRead += ret;
if (ret < toRead) {
// couldn't read full data
return numRead;
}
}
return numRead;
}
int WdtSocket::read(char *buf, int nbyte, bool tryFull) {
return readWithTimeout(buf, nbyte,
threadCtx_.getOptions().read_timeout_millis, tryFull);
}
int WdtSocket::encryptAndWrite(char *buf, int nbyte, int timeoutMs,
bool retry) {
WDT_CHECK_GT(nbyte, 0);
const bool encrypt = encryptionParams_.isSet();
WDT_CHECK(encrypt);
if (!encryptor_->encrypt(buf, nbyte, buf)) {
writeErrorCode_ = ENCRYPTION_ERROR;
return -1;
}
int written = writeInternal(buf, nbyte, timeoutMs, retry);
if (written != nbyte) {
WLOG(ERROR) << "Socket write failure " << written << " " << nbyte;
writeErrorCode_ = SOCKET_WRITE_ERROR;
}
return written;
}
bool WdtSocket::checkAndChangeEncryptionIv() {
if (ivChangeInterval_ == 0) {
return true;
}
if (encryptor_->getNumProcessed() + writeTagInterval_ <= ivChangeInterval_) {
// can wait till next tag write
return true;
}
WLOG(INFO) << "Need to change encryption iv " << port_;
resetEncryptor();
std::string iv;
if (!encryptor_->start(encryptionParams_, iv)) {
return false;
}
WDT_CHECK_EQ(kAESBlockSize, iv.size());
const int written =
writeInternal(iv.data(), kAESBlockSize,
threadCtx_.getOptions().write_timeout_millis, false);
if (written != kAESBlockSize) {
WLOG(ERROR) << "Unable to write new encryption iv " << written << " "
<< kAESBlockSize;
return false;
}
return true;
}
bool WdtSocket::writeEncryptionTag() {
const std::string tag = encryptor_->computeCurrentTag();
if (tag.empty()) {
// computeCurrentTag logs
writeErrorCode_ = ENCRYPTION_ERROR;
return false;
}
const int toWrite = tag.size();
const int written = writeInternal(
tag.data(), toWrite, threadCtx_.getOptions().write_timeout_millis, false);
if (written != toWrite) {
WLOG(ERROR) << "Unable to write encryption tag " << written << " "
<< toWrite;
return false;
}
WVLOG(1) << "Encryption tag written " << folly::humanify(tag)
<< " totalWritten_ " << totalWritten_ << " tag interval "
<< writeTagInterval_;
return true;
}
int WdtSocket::encryptAndWriteWithTag(char *buf, int nbyte, int timeoutMs,
bool retry) {
WDT_CHECK_GT(writeTagInterval_, 0);
WDT_CHECK_LE(nbyte, writeTagInterval_);
// first try to figure out whether this write will contain a tag
const int nextTagOffset =
computeNextTagOffset(totalWritten_, writeTagInterval_);
int written = 0;
if (nextTagOffset < nbyte) {
// tag is contained in this write
if (nextTagOffset > 0) {
// try to write till the tag
const int ret = encryptAndWrite(buf, nextTagOffset, timeoutMs, retry);
if (ret != nextTagOffset) {
return -1;
}
totalWritten_ += ret;
written = ret;
}
WDT_CHECK_EQ(0, totalWritten_ % writeTagInterval_);
// now write the tag
if (!writeEncryptionTag()) {
return -1;
}
if (!checkAndChangeEncryptionIv()) {
writeErrorCode_ = ENCRYPTION_ERROR;
return -1;
}
}
// now try to write rest of the data
const int remainingWrite = nbyte - written;
const int ret =
encryptAndWrite(buf + written, remainingWrite, timeoutMs, retry);
if (ret != remainingWrite) {
return -1;
}
totalWritten_ += ret;
return nbyte;
}
int WdtSocket::write(char *buf, int nbyte, bool retry) {
WDT_CHECK_GT(nbyte, 0);
if (writeErrorCode_ != OK) {
WLOG(ERROR) << "Socket write failed before, not trying to write again "
<< port_;
return -1;
}
// first write encryption settings once
writeEncryptionSettingsOnce();
if (writeErrorCode_ != OK) {
return -1;
}
const int timeoutMs = threadCtx_.getOptions().write_timeout_millis;
const bool encrypt = encryptionParams_.isSet();
// handle no-encryption case
if (!encrypt) {
return writeInternal(buf, nbyte, timeoutMs, retry);
}
if (writeTagInterval_ <= 0) {
return encryptAndWrite(buf, nbyte, timeoutMs, retry);
}
// tag verification enabled
// have to break the write in chunks of writeTagInterval_
int written = 0;
while (written < nbyte) {
const int remaining = nbyte - written;
const int toWrite = std::min(remaining, writeTagInterval_);
const int ret =
encryptAndWriteWithTag(buf + written, toWrite, timeoutMs, retry);
if (ret != toWrite) {
// write failure
return -1;
}
written += ret;
}
return written;
}
int64_t WdtSocket::readWithAbortCheck(char *buf, int64_t nbyte, int timeoutMs,
bool tryFull) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::SOCKET_READ);
return ioWithAbortCheck(::read, buf, nbyte, timeoutMs, tryFull);
}
int64_t WdtSocket::writeWithAbortCheck(const char *buf, int64_t nbyte,
int timeoutMs, bool tryFull) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::SOCKET_WRITE);
return ioWithAbortCheck(::write, buf, nbyte, timeoutMs, tryFull);
}
template <typename F, typename T>
int64_t WdtSocket::ioWithAbortCheck(F readOrWrite, T tbuf, int64_t numBytes,
int timeoutMs, bool tryFull) {
WDT_CHECK(threadCtx_.getAbortChecker() != nullptr)
<< "abort checker can not be null";
bool checkAbort = (threadCtx_.getOptions().abort_check_interval_millis > 0);
auto startTime = Clock::now();
int64_t doneBytes = 0;
int retries = 0;
while (doneBytes < numBytes) {
const int64_t ret =
readOrWrite(fd_, tbuf + doneBytes, numBytes - doneBytes);
if (ret < 0) {
// error
if (errno != EINTR && errno != EAGAIN) {
WPLOG(ERROR) << "non-retryable error encountered during socket io "
<< fd_ << " " << doneBytes << " " << retries;
return (doneBytes > 0 ? doneBytes : ret);
}
} else if (ret == 0) {
// eof
WVLOG(1) << "EOF received during socket io. fd : " << fd_
<< ", finished bytes : " << doneBytes
<< ", retries : " << retries;
return doneBytes;
} else {
// success
doneBytes += ret;
if (!tryFull) {
// do not have to read/write entire data
return doneBytes;
}
}
if (checkAbort && threadCtx_.getAbortChecker()->shouldAbort()) {
WLOG(ERROR) << "transfer aborted during socket io " << fd_ << " "
<< doneBytes << " " << retries;
return (doneBytes > 0 ? doneBytes : -1);
}
if (timeoutMs > 0) {
int duration = durationMillis(Clock::now() - startTime);
if (duration >= timeoutMs) {
WLOG(INFO) << "socket io timed out after " << duration
<< " ms, retries " << retries << " fd " << fd_
<< " doneBytes " << doneBytes;
return (doneBytes > 0 ? doneBytes : -1);
}
}
retries++;
}
WVLOG_IF(1, retries > 1) << "socket io for " << doneBytes << " bytes took "
<< retries << " retries";
return doneBytes;
}
ErrorCode WdtSocket::shutdownWrites() {
ErrorCode code = finalizeWrites(true);
if (::shutdown(fd_, SHUT_WR) < 0) {
if (code == OK) {
WPLOG(WARNING) << "Socket shutdown failed for fd " << fd_;
code = ERROR;
}
}
return code;
}
ErrorCode WdtSocket::expectEndOfStream() {
return finalizeReads(true);
}
// TODO: Seems like we need to reset/clear encry/decr even on errors as we
// reuse/restart/recycle encryption objects and sockets (more correctness
// analysis of error cases needed)
ErrorCode WdtSocket::finalizeReads(bool doTagIOs) {
WVLOG(1) << "Finalizing reads/encryption " << port_ << " " << fd_;
const int toRead = encryptionTypeToTagLen(encryptionParams_.getType());
std::string tag;
ErrorCode code = OK;
if (toRead && doTagIOs) {
tag = readEncryptionTag();
if (tag.empty()) {
WLOG(ERROR) << "Unable to read tag at the end of stream " << port_;
code = ENCRYPTION_ERROR;
}
}
if (!decryptor_->finish(tag)) {
code = ENCRYPTION_ERROR;
}
readsFinalized_ = true;
return code;
}
ErrorCode WdtSocket::finalizeWrites(bool doTagIOs) {
WVLOG(1) << "Finalizing writes/encryption " << port_ << " " << fd_;
ErrorCode code = OK;
std::string tag;
if (!encryptor_->finish(tag)) {
code = ENCRYPTION_ERROR;
}
if (!tag.empty() && doTagIOs) {
const int timeoutMs = threadCtx_.getOptions().write_timeout_millis;
const int expected = tag.size();
if (writeInternal(tag.data(), tag.size(), timeoutMs, false) != expected) {
WPLOG(ERROR) << "Encryption Tag write error";
code = ENCRYPTION_ERROR;
}
}
writesFinalized_ = true;
return code;
}
ErrorCode WdtSocket::closeConnection() {
return closeConnectionInternal(true);
}
void WdtSocket::closeNoCheck() {
closeConnectionInternal(false);
}
ErrorCode WdtSocket::closeConnectionInternal(bool doTagIOs) {
WVLOG(1) << "Closing socket " << port_ << " " << fd_;
if (fd_ < 0) {
return OK;
}
ErrorCode errorCode = getNonRetryableErrCode();
if (!writesFinalized_) {
errorCode = getMoreInterestingError(errorCode, finalizeWrites(doTagIOs));
}
if (!readsFinalized_) {
errorCode = getMoreInterestingError(errorCode, finalizeReads(doTagIOs));
}
if (::close(fd_) != 0) {
WPLOG(ERROR) << "Failed to close socket " << fd_ << " " << port_;
errorCode = getMoreInterestingError(ERROR, errorCode);
}
// This looks like a reset() make it explicit (and check it's complete)
fd_ = -1;
readErrorCode_ = OK;
writeErrorCode_ = OK;
encryptionSettingsRead_ = false;
encryptionSettingsWritten_ = false;
writesFinalized_ = false;
readsFinalized_ = false;
totalRead_ = 0;
totalWritten_ = 0;
resetEncryptor();
resetDecryptor();
WVLOG(1) << "Error code from close " << errorCodeToStr(errorCode);
return errorCode;
}
int WdtSocket::getFd() const {
return fd_;
}
void WdtSocket::setFd(int fd) {
fd_ = fd;
}
int WdtSocket::getPort() const {
return port_;
}
void WdtSocket::setPort(int port) {
port_ = port;
}
EncryptionType WdtSocket::getEncryptionType() const {
return encryptionParams_.getType();
}
ErrorCode WdtSocket::getReadErrCode() const {
return readErrorCode_;
}
ErrorCode WdtSocket::getWriteErrCode() const {
return writeErrorCode_;
}
ErrorCode WdtSocket::getNonRetryableErrCode() const {
ErrorCode errCode = OK;
if (readErrorCode_ != OK && readErrorCode_ != SOCKET_READ_ERROR &&
readErrorCode_ != WDT_TIMEOUT && readErrorCode_ != ENCRYPTION_ERROR) {
errCode = getMoreInterestingError(errCode, readErrorCode_);
}
if (writeErrorCode_ != OK && writeErrorCode_ != SOCKET_WRITE_ERROR &&
writeErrorCode_ != ENCRYPTION_ERROR) {
errCode = getMoreInterestingError(errCode, writeErrorCode_);
}
return errCode;
}
int WdtSocket::getEffectiveTimeout(int networkTimeout) {
int abortInterval = threadCtx_.getOptions().abort_check_interval_millis;
if (abortInterval <= 0) {
return networkTimeout;
}
if (networkTimeout <= 0) {
return abortInterval;
}
return std::min(networkTimeout, abortInterval);
}
void WdtSocket::setSocketTimeouts() {
int readTimeout =
getEffectiveTimeout(threadCtx_.getOptions().read_timeout_millis);
if (readTimeout > 0) {
struct timeval tv;
tv.tv_sec = readTimeout / 1000; // milli to sec
tv.tv_usec = (readTimeout % 1000) * 1000; // milli to micro
if (setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,
sizeof(struct timeval)) != 0) {
WPLOG(ERROR) << "Unable to set read timeout for " << port_ << " " << fd_;
}
}
int writeTimeout =
getEffectiveTimeout(threadCtx_.getOptions().write_timeout_millis);
if (writeTimeout > 0) {
struct timeval tv;
tv.tv_sec = writeTimeout / 1000; // milli to sec
tv.tv_usec = (writeTimeout % 1000) * 1000; // milli to micro
if (setsockopt(fd_, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
sizeof(struct timeval)) != 0) {
WPLOG(ERROR) << "Unable to set write timeout for " << port_ << " " << fd_;
}
}
}
void WdtSocket::setDscp(int dscp) {
if (dscp > 0) {
if (threadCtx_.getOptions().ipv6) {
int classval = dscp << 2;
if(setsockopt(fd_, IPPROTO_IPV6, IPV6_TCLASS, (char*)&classval,
sizeof(classval)) != 0) {
WPLOG(ERROR) << "Unable to set DSCP flag for " << port_ << " " << fd_;
}
}
if (threadCtx_.getOptions().ipv4) {
int ip_tos = dscp << 2;
if(setsockopt(fd_, IPPROTO_IP, IP_TOS, (char*)&ip_tos,
sizeof(ip_tos)) != 0) {
WPLOG(ERROR) << "Unable to set DSCP flag for " << port_ << " " << fd_;
}
}
}
}
/* static */
bool WdtSocket::getNameInfo(const struct sockaddr *sa, socklen_t salen,
std::string &host, std::string &port) {
char hostBuf[NI_MAXHOST], portBuf[NI_MAXSERV];
int res = getnameinfo(sa, salen, hostBuf, sizeof(hostBuf), portBuf,
sizeof(portBuf), NI_NUMERICHOST | NI_NUMERICSERV);
if (res) {
WLOG(ERROR) << "getnameinfo failed " << gai_strerror(res);
return false;
}
host = std::string(hostBuf);
port = std::string(portBuf);
return true;
}
int WdtSocket::getReceiveBufferSize() const {
int size;
socklen_t sizeSize = sizeof(size);
getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, (void *)&size, &sizeSize);
return size;
}
int WdtSocket::getSendBufferSize() const {
int size;
socklen_t sizeSize = sizeof(size);
getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, (void *)&size, &sizeSize);
return size;
}
void WdtSocket::resetEncryptor() {
encryptor_ = std::make_unique<AESEncryptor>();
}
void WdtSocket::resetDecryptor() {
decryptor_ = std::make_unique<AESDecryptor>();
}
int WdtSocket::getUnackedBytes() const {
#ifdef WDT_HAS_SOCKIOS_H
int numUnackedBytes;
int ret;
{
PerfStatCollector statCollector(threadCtx_, PerfStatReport::IOCTL);
ret = ::ioctl(fd_, SIOCOUTQ, &numUnackedBytes);
}
if (ret != 0) {
WPLOG(ERROR) << "Failed to get unacked bytes for socket " << fd_;
numUnackedBytes = -1;
}
return numUnackedBytes;
#else
WLOG(WARNING) << "Wdt has no way to determine unacked bytes for socket";
return -1;
#endif
}
WdtSocket::~WdtSocket() {
WVLOG(1) << "~WdtSocket " << port_ << " " << fd_;
closeNoCheck();
}
}
}