SenderThread.h (107 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/Conv.h>
#include <wdt/Sender.h>
#include <wdt/WdtThread.h>
#include <wdt/util/IClientSocket.h>
#include <wdt/util/ThreadTransferHistory.h>
#include <thread>
namespace facebook {
namespace wdt {
class DirectorySourceQueue;
/// state machine states
enum SenderState {
CONNECT,
READ_LOCAL_CHECKPOINT,
SEND_SETTINGS,
SEND_BLOCKS,
SEND_DONE_CMD,
SEND_SIZE_CMD,
CHECK_FOR_ABORT,
READ_FILE_CHUNKS,
READ_RECEIVER_CMD,
PROCESS_DONE_CMD,
PROCESS_WAIT_CMD,
PROCESS_ERR_CMD,
PROCESS_ABORT_CMD,
PROCESS_VERSION_MISMATCH,
END
};
/**
* This class represents one sender thread. It contains all the
* functionalities that a thread would need to send data over
* a connection to the receiver.
* All the sender threads share bunch of modules like directory queue,
* throttler, threads controller etc
*/
class SenderThread : public WdtThread {
public:
/// Identifers for the barriers used in the thread
enum SENDER_BARRIERS { VERSION_MISMATCH_BARRIER, NUM_BARRIERS };
/// Identifiers for the funnels used in the thread
enum SENDER_FUNNELS { VERSION_MISMATCH_FUNNEL, NUM_FUNNELS };
/// Identifier for the condition wrappers used in the thread
enum SENDER_CONDITIONS { NUM_CONDITIONS };
/// abort checker passed to client sockets. This checks both global sender
/// abort and whether global checkpoint has been received or not
class SocketAbortChecker : public IAbortChecker {
public:
explicit SocketAbortChecker(SenderThread *threadPtr)
: threadPtr_(threadPtr) {
}
bool shouldAbort() const override {
return (threadPtr_->getThreadAbortCode() != OK);
}
private:
SenderThread *threadPtr_{nullptr};
};
/// Constructor for the sender thread
SenderThread(Sender *sender, int threadIndex, int32_t port,
ThreadsController *threadsController)
: WdtThread(sender->options_, threadIndex, port,
sender->getProtocolVersion(), threadsController),
wdtParent_(sender),
dirQueue_(sender->dirQueue_.get()),
transferHistoryController_(sender->transferHistoryController_.get()) {
controller_->registerThread(threadIndex_);
transferHistoryController_->addThreadHistory(port_, threadStats_);
threadAbortChecker_ = std::make_unique<SocketAbortChecker>(this);
threadCtx_->setAbortChecker(threadAbortChecker_.get());
threadStats_.setId(folly::to<std::string>(threadIndex_));
isTty_ = isatty(STDERR_FILENO);
}
typedef SenderState (SenderThread::*StateFunction)();
/// Returns the neogtiated protocol
int getNegotiatedProtocol() const override;
/// Steps to do ebfore calling start
ErrorCode init() override;
/// Reset the sender thread
void reset() override;
/// Get the port sender thread is connecting to
int getPort() const override;
/// returns current abort code. checks for both global abort and abort due to
/// receive of global checkpoint
ErrorCode getThreadAbortCode();
/// Destructor of the sender thread
~SenderThread() override {
}
private:
/// Overloaded operator for printing thread info
friend std::ostream &operator<<(std::ostream &os,
const SenderThread &senderThread);
/// Parent shared among all the threads for meta information
Sender *wdtParent_;
/// sets the correct footer type depending on the checksum and encryption type
void setFooterType();
/// The main entry point of the thread
void start() override;
/// Get the local transfer history
ThreadTransferHistory &getTransferHistory() {
return transferHistoryController_->getTransferHistory(port_);
}
/**
* tries to connect to the receiver
* Previous states : Almost all states(in case of network errors, all states
* move to this state)
* Next states : SEND_SETTINGS(if there is no previous error)
* READ_LOCAL_CHECKPOINT(if there is previous error)
* END(failed)
*/
SenderState connect();
/**
* tries to read local checkpoint and return unacked sources to queue. If the
* checkpoint value is -1, then we know previous attempt to send DONE had
* failed. So, we move to READ_RECEIVER_CMD state.
* Previous states : CONNECT
* Next states : CONNECT(read failure),
* END(protocol error or global checkpoint found),
* READ_RECEIVER_CMD(if checkpoint is -1),
* SEND_SETTINGS(success)
*/
SenderState readLocalCheckPoint();
/**
* sends sender settings to the receiver
* Previous states : READ_LOCAL_CHECKPOINT,
* CONNECT
* Next states : SEND_BLOCKS(success),
* CONNECT(failure)
*/
SenderState sendSettings();
/**
* sends blocks to receiver till the queue is not empty. After transferring a
* block, we add it to the history. While adding to history, if it is found
* that global checkpoint has been received for this thread, we move to END
* state.
* Previous states : SEND_SETTINGS,
* PROCESS_ERR_CMD
* Next states : SEND_BLOCKS(success),
* END(global checkpoint received),
* CHECK_FOR_ABORT(socket write failure),
* SEND_DONE_CMD(no more blocks left to transfer)
*/
SenderState sendBlocks();
/**
* sends DONE cmd to the receiver
* Previous states : SEND_BLOCKS
* Next states : CONNECT(failure),
* READ_RECEIVER_CMD(success)
*/
SenderState sendDoneCmd();
/**
* sends size cmd to the receiver
* Previous states : SEND_BLOCKS
* Next states : CHECK_FOR_ABORT(failure),
* SEND_BLOCKS(success)
*/
SenderState sendSizeCmd();
/**
* checks to see if the receiver has sent ABORT or not
* Previous states : SEND_BLOCKS,
* SEND_DONE_CMD
* Next states : CONNECT(no ABORT cmd),
* END(protocol error),
* PROCESS_ABORT_CMD(read ABORT cmd)
*/
SenderState checkForAbort();
/**
* reads previously transferred file chunks list. If it receives an ACK cmd,
* then it moves on. If wait cmd is received, it waits. Otherwise reads the
* file chunks and when done starts directory queue thread.
* Previous states : SEND_SETTINGS,
* Next states: READ_FILE_CHUNKS(if wait cmd is received),
* CHECK_FOR_ABORT(network error),
* END(protocol error),
* SEND_BLOCKS(success)
*
*/
SenderState readFileChunks();
/**
* reads receiver cmd
* Previous states : SEND_DONE_CMD
* Next states : PROCESS_DONE_CMD,
* PROCESS_WAIT_CMD,
* PROCESS_ERR_CMD,
* END(protocol error),
* CONNECT(failure)
*/
SenderState readReceiverCmd();
/**
* handles DONE cmd
* Previous states : READ_RECEIVER_CMD
* Next states : END
*/
SenderState processDoneCmd();
/**
* handles WAIT cmd
* Previous states : READ_RECEIVER_CMD
* Next states : READ_RECEIVER_CMD
*/
SenderState processWaitCmd();
/**
* reads list of global checkpoints and returns unacked sources to queue.
* Previous states : READ_RECEIVER_CMD
* Next states : CONNECT(socket read failure)
* END(checkpoint list decode failure),
* SEND_BLOCKS(success)
*/
SenderState processErrCmd();
/**
* processes ABORT cmd
* Previous states : CHECK_FOR_ABORT,
* READ_RECEIVER_CMD
* Next states : END
*/
SenderState processAbortCmd();
/**
* waits for all active threads to be aborted, checks to see if the abort was
* due to version mismatch. Also performs various sanity checks.
* Previous states : Almost all threads, abort flags is checked between every
* state transition
* Next states : CONNECT(Abort was due to version kismatch),
* END(if abort was not due to version mismatch or some sanity
* check failed)
*/
SenderState processVersionMismatch();
/**
* Reads next receiver cmd. If the read times out, checks to see if the tcp
* unacked bytes have decreased or not
*
* @return status of the read
*/
ErrorCode readNextReceiverCmd();
/**
* Reads and verifies spurious extra checkpoint. Receiver can insert extra
* checkpoint in case some bad client connected to it.
*
* @return status of read/verification
*/
ErrorCode readAndVerifySpuriousCheckpoint();
/// General utility used by sender threads to connect to receiver
std::unique_ptr<IClientSocket> connectToReceiver(
int port, IAbortChecker const *abortChecker, ErrorCode &errCode);
/// Method responsible for sending one source to the destination
TransferStats sendOneByteSource(const std::unique_ptr<ByteSource> &source,
ErrorCode transferStatus);
/// checks to see if heart-beat is enabled, and if it is time to read
/// heart-beats, and if yes, reads heart-beats
ErrorCode readHeartBeats();
/// mapping from sender states to state functions
static const StateFunction stateMap_[];
/// whether stderr is tty
bool isTty_{false};
/// Negotiated protocol of the sender thread
int negotiatedProtocol_{-1};
/// Pointer to client socket to maintain connection to the receiver
/// Note: this must be deleted while still on the same thread it was created
/// (for fbonly / btm / thrift / eventbase reasons)
std::unique_ptr<IClientSocket> socket_;
/// whether total file size has been sent to the receiver
bool totalSizeSent_{false};
/// number of consecutive reconnects without any progress
int numReconnectWithoutProgress_{0};
/// Point to the directory queue of parent sender
DirectorySourceQueue *dirQueue_;
/// abort checker to use for this thread
std::unique_ptr<IAbortChecker> threadAbortChecker_{nullptr};
/// Thread history controller shared across all threads
TransferHistoryController *transferHistoryController_;
};
}
}