ReceiverThread.h (80 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/Receiver.h>
#include <wdt/WdtBase.h>
#include <wdt/WdtThread.h>
#include <wdt/util/ServerSocket.h>
namespace facebook {
namespace wdt {
class Receiver;
/**
* Wdt receiver has logic to maintain the consistency of the
* transfers through connection errors. All threads are run by the logic
* defined as a state machine. These are the all the states in that
* state machine
*/
enum ReceiverState {
LISTEN,
ACCEPT_FIRST_CONNECTION,
ACCEPT_WITH_TIMEOUT,
SEND_LOCAL_CHECKPOINT,
READ_NEXT_CMD,
PROCESS_FILE_CMD,
PROCESS_SETTINGS_CMD,
PROCESS_DONE_CMD,
PROCESS_SIZE_CMD,
SEND_FILE_CHUNKS,
SEND_GLOBAL_CHECKPOINTS,
SEND_DONE_CMD,
SEND_ABORT_CMD,
WAIT_FOR_FINISH_OR_NEW_CHECKPOINT,
FINISH_WITH_ERROR,
END
};
/**
* This class represents a receiver thread. It contains
* all the logic for a thread to bind on a port and
* receive data from the wdt sender. All the receiver threads
* share modules like threads controller, throttler etc
*/
class ReceiverThread : public WdtThread {
public:
/// Identifiers for the funnels that this thread will use
enum RECEIVER_FUNNELS { SEND_FILE_CHUNKS_FUNNEL, NUM_FUNNELS };
/// Identifiers for the condition variable wrappers used in the thread
enum RECEIVER_CONDITIONS { WAIT_FOR_FINISH_OR_CHECKPOINT_CV, NUM_CONDITIONS };
/// Identifiers for the barriers used in the thread
enum RECEIVER_BARRIERS { NUM_BARRIERS };
/**
* Constructor for receiver thread.
* @param wdtParent Pointer back to the parent receiver for meta
* information
* @param threadIndex Every thread is identified by unique index
* @param port Port this thread will listen on
* @param controller Thread controller for all the instances of the
* receiver threads. All the receiver thread objects
* need to share the same instance of the controller
*/
ReceiverThread(Receiver *wdtParent, int threadIndex, int port,
ThreadsController *controller);
/// Initializes the receiver thread before starting
ErrorCode init() override;
/**
* In long running mode, we need to reset thread variables after each
* session. Before starting each session, reset() has to called to do that.
*/
void reset() override;
/// Destructor of Receiver thread
~ReceiverThread() override;
/// Get the port this receiver thread is listening on
int32_t getPort() const override;
private:
/// Overloaded operator for printing thread info
friend std::ostream &operator<<(std::ostream &os,
const ReceiverThread &receiverThread);
typedef ReceiverState (ReceiverThread::*StateFunction)();
/// Parent shared among all the threads for meta information
Receiver *wdtParent_;
/**
* Tries to listen/bind to port. If this fails, thread is considered failed.
* Previous states : n/a (start state)
* Next states : ACCEPT_FIRST_CONNECTION(success),
* FINISH_WITH_ERROR(failure)
*/
ReceiverState listen();
/**
* Tries to accept first connection of a new session. Periodically checks
* whether a new session has started or not. If a new session has started then
* goes to ACCEPT_WITH_TIMEOUT state. Also does session initialization. In
* joinable mode, tries to accept for a limited number of user specified
* retries.
* Previous states : LISTEN,
* END(if in long running mode)
* Next states : ACCEPT_WITH_TIMEOUT(if a new transfer has started and this
* thread has not received a connection),
* FINISH_WITH_ERROR(if did not receive a
* connection in specified number of retries),
* READ_NEXT_CMD(if a connection was received)
*/
ReceiverState acceptFirstConnection();
/**
* Tries to accept a connection with timeout. There are 2 kinds of timeout. At
* the beginning of the session, it uses accept window as the timeout. Later
* when sender settings are known it uses max(readTimeOut, writeTimeout)) +
* buffer(500) as the timeout.
* Previous states : Almost all states(for any network errors during transfer,
* we transition to this state),
* Next states : READ_NEXT_CMD(if there are no previous errors and accept
* was successful),
* SEND_LOCAL_CHECKPOINT(if there were previous errors and
* accept was successful),
* FINISH_WITH_ERROR(if accept failed and
* transfer previously failed during SEND_DONE_CMD state. Thus
* case needs special handling to ensure that we do not mess up
* session variables),
* END(if accept fails otherwise)
*/
ReceiverState acceptWithTimeout();
/**
* Sends local checkpoint to the sender. In case of previous error during
* SEND_LOCAL_CHECKPOINT state, we send -1 as the checkpoint.
* Previous states : ACCEPT_WITH_TIMEOUT
* Next states : ACCEPT_WITH_TIMEOUT(if sending fails),
* SEND_DONE_CMD(if send is successful and we have previous
* SEND_DONE_CMD error),
* READ_NEXT_CMD(if send is successful otherwise)
*/
ReceiverState sendLocalCheckpoint();
/**
* Reads next cmd and transitions to the state accordingly.
* Previous states : SEND_LOCAL_CHECKPOINT,
* ACCEPT_FIRST_CONNECTION,
* ACCEPT_WITH_TIMEOUT,
* PROCESS_SETTINGS_CMD,
* PROCESS_FILE_CMD,
* SEND_GLOBAL_CHECKPOINTS,
* Next states : PROCESS_FILE_CMD,
* PROCESS_DONE_CMD,
* PROCESS_SETTINGS_CMD,
* PROCESS_SIZE_CMD,
* ACCEPT_WITH_TIMEOUT(in case of read failure),
* FINISH_WITH_ERROR(in case of protocol errors)
*/
ReceiverState readNextCmd();
/**
* Processes file cmd. Logic of how we write the file to the destination
* directory is defined here.
* Previous states : READ_NEXT_CMD
* Next states : READ_NEXT_CMD(success),
* FINISH_WITH_ERROR(protocol error),
* ACCEPT_WITH_TIMEOUT(socket read failure)
*/
ReceiverState processFileCmd();
/**
* Processes settings cmd. Settings has a connection settings,
* protocol version, transfer id, etc. For more info check Protocol.h
* Previous states : READ_NEXT_CMD,
* Next states : READ_NEXT_CMD(success),
* FINISH_WITH_ERROR(protocol error),
* ACCEPT_WITH_TIMEOUT(socket read failure),
* SEND_FILE_CHUNKS(If the sender wants to resume transfer)
*/
ReceiverState processSettingsCmd();
/**
* Processes done cmd. Also checks to see if there are any new global
* checkpoints or not
* Previous states : READ_NEXT_CMD,
* Next states : FINISH_WITH_ERROR(protocol error),
* WAIT_FOR_FINISH_OR_NEW_CHECKPOINT(success),
* SEND_GLOBAL_CHECKPOINTS(if there are global errors)
*/
ReceiverState processDoneCmd();
/**
* Processes size cmd. Sets the value of totalSenderBytes_
* Previous states : READ_NEXT_CMD,
* Next states : READ_NEXT_CMD(success),
* FINISH_WITH_ERROR(protocol error)
*/
ReceiverState processSizeCmd();
/**
* Sends file chunks that were received successfully in any previous transfer,
* this is the first step in download resumption.
* Checks to see if they have already been transferred or not.
* If yes, send ACK. If some other thread is sending it, sends wait cmd
* and checks again later. Otherwise, breaks the entire data into bufferSIze_
* chunks and sends it.
* Previous states: PROCESS_SETTINGS_CMD,
* Next states : ACCEPT_WITH_TIMEOUT(network error),
* READ_NEXT_CMD(success)
*/
ReceiverState sendFileChunks();
/**
* Sends global checkpoints to sender
* Previous states : PROCESS_DONE_CMD,
* FINISH_WITH_ERROR
* Next states : READ_NEXT_CMD(success),
* ACCEPT_WITH_TIMEOUT(socket write failure)
*/
ReceiverState sendGlobalCheckpoint();
/**
* Sends DONE to sender, also tries to read back ack. If anything fails during
* this state, doneSendFailure_ thread variable is set. This flag makes the
* state machine behave differently, effectively bypassing all session related
* things.
* Previous states : SEND_LOCAL_CHECKPOINT,
* FINISH_WITH_ERROR
* Next states : END(success),
* ACCEPT_WITH_TIMEOUT(failure)
*/
ReceiverState sendDoneCmd();
/**
* Sends ABORT cmd back to the sender
* Previous states : PROCESS_FILE_CMD
* Next states : FINISH_WITH_ERROR
*/
ReceiverState sendAbortCmd();
/**
* Internal implementation of waitForFinishOrNewCheckpoint
* Returns :
* SEND_GLOBAL_CHECKPOINTS if there are checkpoints
* SEND_DONE_CMD if there are no checkpoints and
* there are no active threads
* WAIT_FOR_FINISH_OR_NEW_CHECKPOINT in all other cases
*/
ReceiverState checkForFinishOrNewCheckpoints();
/**
* Waits for transfer to finish or new checkpoints. This state first
* increments waitingThreadCount_. Then, it
* waits till all the threads have finished. It sends periodic WAIT signal to
* prevent sender from timing out. If a new checkpoint is found, we move to
* SEND_GLOBAL_CHECKPOINTS state.
* Previous states : PROCESS_DONE_CMD
* Next states : SEND_DONE_CMD(all threads finished),
* SEND_GLOBAL_CHECKPOINTS(if new checkpoints are found),
* ACCEPT_WITH_TIMEOUT(if socket write fails)
*/
ReceiverState waitForFinishOrNewCheckpoint();
/**
* Waits for transfer to finish. Only called when there is an error for the
* thread. It adds a checkpoint to the global list of checkpoints if a
* connection was received. It increments waitingWithErrorThreadCount_ and
* waits till the session ends.
* Previous states : Almost all states
* Next states : END
*/
ReceiverState finishWithError();
/// marks a block a verified
void markBlockVerified(const BlockDetails &blockDetails);
/// verifies received blocks which are not already verified
void markReceivedBlocksVerified();
/// checks whether heart-beat is enabled, and whether it is time to send
/// another heart-beat, and if yes, sends a heart-beat
void sendHeartBeat();
/// Mapping from receiver states to state functions
static const StateFunction stateMap_[];
/// Main entry point for the thread, starts the state machine
void start() override;
/**
* Server socket object that provides functionality such as listen()
* accept, read, write on the socket
*/
std::unique_ptr<IServerSocket> socket_{nullptr};
/// Marks the number of bytes already read in the buffer
int64_t numRead_{0};
/// Following two are markers to mark how much data has been read/parsed
int64_t off_{0};
int64_t oldOffset_{0};
/// Number of checkpoints already transferred
int checkpointIndex_{0};
/// Checkpoints saved for this thread
std::vector<Checkpoint> checkpoints_;
/**
* Pending value of checkpoint count. since write call success does not
* gurantee actual transfer, we do not apply checkpoint count update after
* the write. Only after receiving next cmd from sender, we apply the
* update
*/
int pendingCheckpointIndex_{0};
/// read timeout for sender
int64_t senderReadTimeout_{-1};
/// write timeout for sender
int64_t senderWriteTimeout_{-1};
/// whether the transfer is in block mode or not
bool isBlockMode_{true};
/// Checkpoint local to the thread, updated regularly
Checkpoint checkpoint_;
/// whether settings have been received and verified for the current
/// connection. This is used to determine round robin order for polling in
/// the server socket
bool curConnectionVerified_{false};
/// Checkpoints that have not been sent back to the sender
std::vector<Checkpoint> newCheckpoints_;
/// list of received blocks which have not yet been verified
std::vector<BlockDetails> blocksWaitingVerification_;
};
}
}