util/ThreadTransferHistory.h (59 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/SpinLock.h>
#include <wdt/Protocol.h>
#include <wdt/Reporting.h>
#include <wdt/util/DirectorySourceQueue.h>
#include <vector>
namespace facebook {
namespace wdt {
/// Transfer history of a sender thread
class ThreadTransferHistory {
public:
/**
* @param queue directory queue
* @param threadStats stat object of the thread
*/
ThreadTransferHistory(DirectorySourceQueue &queue, TransferStats &threadStats,
int32_t port);
/**
* @param index of the source
* @return if index is in bounds, returns the identifier for the
* source, else returns empty string
*/
std::string getSourceId(int64_t index);
/**
* Adds the source to the history. If global checkpoint has already been
* received, then the source is returned to the queue.
*
* @param source source to be added to history
* @return true if added to history, false if not added due to a
* global checkpoint
*/
bool addSource(std::unique_ptr<ByteSource> &source);
/**
* Sets checkpoint. Also, returns unacked sources to queue
* @param checkpoint checkpoint received
* @param globalCheckpoint global or local checkpoint
* @return number of sources returned to queue
*/
ErrorCode setLocalCheckpoint(const Checkpoint &checkpoint);
/**
* @return stats for acked sources, must be called after all the
* unacked sources are returned to the queue
*/
std::vector<TransferStats> popAckedSourceStats();
/// marks all the sources as acked
void markAllAcknowledged();
/**
* returns all unacked sources to the queue
*/
void returnUnackedSourcesToQueue();
/**
* @return number of sources acked by the receiver
*/
int64_t getNumAcked() const {
return numAcknowledged_;
}
/// @return whether global checkpoint has been received or not
bool isGlobalCheckpointReceived();
/// Clears the inUse_ flag and notifies other waiting threads
void markNotInUse();
/// Copy constructor deleted
ThreadTransferHistory(const ThreadTransferHistory &that) = delete;
/// Delete the assignment operatory by copy
ThreadTransferHistory &operator=(const ThreadTransferHistory &that) = delete;
private:
friend class TransferHistoryController;
/// Validates a checkpoint and returns the status
ErrorCode validateCheckpoint(const Checkpoint &checkpoint,
bool globalCheckpoint);
/**
* Sets global checkpoint. If the history is still in use, waits for the
* error thread to add current block to the history.
*
* @param checkpoint checkpoint received
*
* @return status of the operation
*/
ErrorCode setGlobalCheckpoint(const Checkpoint &checkpoint);
void markSourceAsFailed(std::unique_ptr<ByteSource> &source,
const Checkpoint *checkpoint);
/**
* Sets checkpoint. Also, returns unacked sources to queue
*
* @param checkpoint checkpoint received
* @param globalCheckpoint global or local checkpoint
* @return status of sources returned to queue
*/
ErrorCode setCheckpointAndReturnToQueue(const Checkpoint &checkpoint,
bool globalCheckpoint);
/// reference to global queue
DirectorySourceQueue &queue_;
/// reference to thread stats
TransferStats &threadStats_;
/// history of the thread
std::vector<std::unique_ptr<ByteSource>> history_;
/// whether a global error checkpoint has been received or not
bool globalCheckpoint_{false};
/// number of sources acked by the receiver thread
int64_t numAcknowledged_{0};
/// last received checkpoint
std::unique_ptr<Checkpoint> lastCheckpoint_{nullptr};
/// Port assosciated with the history
int32_t port_;
/// whether the owner thread is still using this
bool inUse_{true};
/// Mutex used by history internally for synchronization
std::mutex mutex_;
/// Condition variable to signify the history being in use
std::condition_variable conditionInUse_;
};
/// Controller for history across the sender threads
class TransferHistoryController {
public:
/**
* Constructor for the history controller
* @param dirQueue Directory queue used by the sender
*/
explicit TransferHistoryController(DirectorySourceQueue &dirQueue);
/**
* Add transfer history for a thread
* @param port Port being used by the sender thread
* @param threadStats Thread stats for the sender thread
*/
void addThreadHistory(int32_t port, TransferStats &threadStats);
/// Get transfer history for a thread using its port number
ThreadTransferHistory &getTransferHistory(int32_t port);
/// Handle version mismatch across the histories of all thread
ErrorCode handleVersionMismatch();
/// Handle checkpoint that was sent by the receiver
void handleGlobalCheckpoint(const Checkpoint &checkpoint);
private:
/// Reference to the directory queue being used by the sender
DirectorySourceQueue &dirQueue_;
/// Map of port (used by sender threads) and transfer history
std::unordered_map<int32_t, std::unique_ptr<ThreadTransferHistory>>
threadHistoriesMap_;
};
}
}