util/ThreadTransferHistory.cpp (244 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include <wdt/util/ThreadTransferHistory.h> #include <wdt/Sender.h> namespace facebook { namespace wdt { ThreadTransferHistory::ThreadTransferHistory(DirectorySourceQueue &queue, TransferStats &threadStats, int32_t port) : queue_(queue), threadStats_(threadStats), port_(port) { WVLOG(1) << "Making thread history for port " << port_; } std::string ThreadTransferHistory::getSourceId(int64_t index) { std::lock_guard<std::mutex> lock(mutex_); std::string sourceId; const int64_t historySize = history_.size(); if (index >= 0 && index < historySize) { sourceId = history_[index]->getIdentifier(); } else { WLOG(WARNING) << "Trying to read out of bounds data " << index << " " << history_.size(); } return sourceId; } bool ThreadTransferHistory::addSource(std::unique_ptr<ByteSource> &source) { std::lock_guard<std::mutex> lock(mutex_); if (globalCheckpoint_) { // already received an error for this thread WVLOG(1) << "adding source after global checkpoint is received. returning " "the source to the queue"; markSourceAsFailed(source, lastCheckpoint_.get()); lastCheckpoint_.reset(); queue_.returnToQueue(source); return false; } history_.emplace_back(std::move(source)); return true; } ErrorCode ThreadTransferHistory::setLocalCheckpoint( const Checkpoint &checkpoint) { std::lock_guard<std::mutex> lock(mutex_); return setCheckpointAndReturnToQueue(checkpoint, false); } ErrorCode ThreadTransferHistory::setGlobalCheckpoint( const Checkpoint &checkpoint) { std::unique_lock<std::mutex> lock(mutex_); ErrorCode status = setCheckpointAndReturnToQueue(checkpoint, true); while (inUse_) { // have to wait, error thread signalled through globalCheckpoint_ flag WLOG(INFO) << "Transfer history still in use, waiting, checkpoint " << checkpoint; conditionInUse_.wait(lock); } return status; } ErrorCode ThreadTransferHistory::setCheckpointAndReturnToQueue( const Checkpoint &checkpoint, bool globalCheckpoint) { const int64_t historySize = history_.size(); int64_t numReceivedSources = checkpoint.numBlocks; int64_t lastBlockReceivedBytes = checkpoint.lastBlockReceivedBytes; if (numReceivedSources > historySize) { WLOG(ERROR) << "checkpoint is greater than total number of sources transferred " << history_.size() << " " << numReceivedSources; return INVALID_CHECKPOINT; } ErrorCode errCode = validateCheckpoint(checkpoint, globalCheckpoint); if (errCode == INVALID_CHECKPOINT) { return INVALID_CHECKPOINT; } globalCheckpoint_ |= globalCheckpoint; lastCheckpoint_ = std::make_unique<Checkpoint>(checkpoint); int64_t numFailedSources = historySize - numReceivedSources; if (numFailedSources == 0 && lastBlockReceivedBytes > 0) { if (!globalCheckpoint) { // no block to apply checkpoint offset. This can happen if we receive same // local checkpoint without adding anything to the history WLOG(WARNING) << "Local checkpoint has received bytes for last block, but " "there are no unacked blocks in the history. Ignoring."; } } numAcknowledged_ = numReceivedSources; std::vector<std::unique_ptr<ByteSource>> sourcesToReturn; for (int64_t i = 0; i < numFailedSources; i++) { std::unique_ptr<ByteSource> source = std::move(history_.back()); history_.pop_back(); const Checkpoint *checkpointPtr = (i == numFailedSources - 1 ? &checkpoint : nullptr); markSourceAsFailed(source, checkpointPtr); sourcesToReturn.emplace_back(std::move(source)); } queue_.returnToQueue(sourcesToReturn); WLOG(INFO) << numFailedSources << " number of sources returned to queue, checkpoint: " << checkpoint; return errCode; } std::vector<TransferStats> ThreadTransferHistory::popAckedSourceStats() { std::unique_lock<std::mutex> lock(mutex_); const int64_t historySize = history_.size(); WDT_CHECK(numAcknowledged_ == historySize); // no locking needed, as this should be called after transfer has finished std::vector<TransferStats> sourceStats; while (!history_.empty()) { sourceStats.emplace_back(std::move(history_.back()->getTransferStats())); history_.pop_back(); } return sourceStats; } void ThreadTransferHistory::markAllAcknowledged() { std::unique_lock<std::mutex> lock(mutex_); numAcknowledged_ = history_.size(); } void ThreadTransferHistory::returnUnackedSourcesToQueue() { std::unique_lock<std::mutex> lock(mutex_); Checkpoint checkpoint; checkpoint.numBlocks = numAcknowledged_; setCheckpointAndReturnToQueue(checkpoint, false); } ErrorCode ThreadTransferHistory::validateCheckpoint( const Checkpoint &checkpoint, bool globalCheckpoint) { if (lastCheckpoint_ == nullptr) { return OK; } if (checkpoint.numBlocks < lastCheckpoint_->numBlocks) { WLOG(ERROR) << "Current checkpoint must be higher than previous checkpoint, " "Last checkpoint: " << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; return INVALID_CHECKPOINT; } if (checkpoint.numBlocks > lastCheckpoint_->numBlocks) { return OK; } bool noProgress = false; // numBlocks same if (checkpoint.lastBlockSeqId == lastCheckpoint_->lastBlockSeqId && checkpoint.lastBlockOffset == lastCheckpoint_->lastBlockOffset) { // same block if (checkpoint.lastBlockReceivedBytes != lastCheckpoint_->lastBlockReceivedBytes) { WLOG(ERROR) << "Current checkpoint has different received bytes, but all " "other fields are same, Last checkpoint " << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; return INVALID_CHECKPOINT; } noProgress = true; } else { // different block WDT_CHECK(checkpoint.lastBlockReceivedBytes >= 0); if (checkpoint.lastBlockReceivedBytes == 0) { noProgress = true; } } if (noProgress && !globalCheckpoint) { // we can get same global checkpoint multiple times, so no need to check for // progress WLOG(WARNING) << "No progress since last checkpoint, Last checkpoint: " << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; return NO_PROGRESS; } return OK; } void ThreadTransferHistory::markSourceAsFailed( std::unique_ptr<ByteSource> &source, const Checkpoint *checkpoint) { auto &metadata = source->getMetaData(); bool validCheckpoint = false; if (checkpoint != nullptr) { if (checkpoint->hasSeqId) { if ((checkpoint->lastBlockSeqId == metadata.seqId) && (checkpoint->lastBlockOffset == source->getOffset())) { validCheckpoint = true; } else { WLOG(WARNING) << "Checkpoint block does not match history block. Checkpoint: " << checkpoint->lastBlockSeqId << ", " << checkpoint->lastBlockOffset << " History: " << metadata.seqId << ", " << source->getOffset(); } } else { // Receiver at lower version! // checkpoint does not have seq-id. We have to blindly trust // lastBlockReceivedBytes. If we do not, transfer will fail because of // number of bytes mismatch. Even if an error happens because of this, // Receiver will fail. validCheckpoint = true; } } int64_t receivedBytes = (validCheckpoint ? checkpoint->lastBlockReceivedBytes : 0); TransferStats &sourceStats = source->getTransferStats(); if (sourceStats.getLocalErrorCode() != OK) { // already marked as failed sourceStats.addEffectiveBytes(0, receivedBytes); threadStats_.addEffectiveBytes(0, receivedBytes); } else { auto dataBytes = source->getSize(); auto headerBytes = sourceStats.getEffectiveHeaderBytes(); int64_t wastedBytes = dataBytes - receivedBytes; sourceStats.subtractEffectiveBytes(headerBytes, wastedBytes); sourceStats.decrNumBlocks(); sourceStats.setLocalErrorCode(SOCKET_WRITE_ERROR); sourceStats.incrFailedAttempts(); threadStats_.subtractEffectiveBytes(headerBytes, wastedBytes); threadStats_.decrNumBlocks(); threadStats_.incrFailedAttempts(); } source->advanceOffset(receivedBytes); } bool ThreadTransferHistory::isGlobalCheckpointReceived() { std::lock_guard<std::mutex> lock(mutex_); return globalCheckpoint_; } void ThreadTransferHistory::markNotInUse() { std::lock_guard<std::mutex> lock(mutex_); inUse_ = false; conditionInUse_.notify_all(); } TransferHistoryController::TransferHistoryController( DirectorySourceQueue &dirQueue) : dirQueue_(dirQueue) { } ThreadTransferHistory &TransferHistoryController::getTransferHistory( int32_t port) { auto it = threadHistoriesMap_.find(port); WDT_CHECK(it != threadHistoriesMap_.end()) << "port not found" << port; return *(it->second.get()); } void TransferHistoryController::addThreadHistory(int32_t port, TransferStats &threadStats) { WVLOG(1) << "Adding the history for " << port; threadHistoriesMap_.emplace(port, std::make_unique<ThreadTransferHistory>( dirQueue_, threadStats, port)); } ErrorCode TransferHistoryController::handleVersionMismatch() { for (auto &historyPair : threadHistoriesMap_) { auto &history = historyPair.second; if (history->getNumAcked() > 0) { WLOG(ERROR) << "Even though the transfer aborted due to VERSION_MISMATCH, " "some blocks got acked by the receiver, port " << historyPair.first << " numAcked " << history->getNumAcked(); return ERROR; } history->returnUnackedSourcesToQueue(); } return OK; } void TransferHistoryController::handleGlobalCheckpoint( const Checkpoint &checkpoint) { auto errPort = checkpoint.port; auto it = threadHistoriesMap_.find(errPort); if (it == threadHistoriesMap_.end()) { WLOG(ERROR) << "Invalid checkpoint " << checkpoint << ". No sender thread running on port " << errPort; return; } WVLOG(1) << "received global checkpoint " << checkpoint; it->second->setGlobalCheckpoint(checkpoint); } } }