host/cxpslib/client.h (2,340 lines of code) (raw):
/// \file client.h
///
/// \brief for interacting with the cx process server
///
/// you can create as many client objects as you want. Each can run in a separate thread,
/// each issuing their own requests. However a given client object is not thread safe and
/// can only issue one request at time. It must finish the request or abort the request
/// before issuing another request.
///
/// Note: "this->" is used in serveral places to resolve template inheritance name look up dependencies
/// could have used "BaseClass<typename>::" (where BaseClass is the class that has the function and typename is the correct type)
/// or could use "using BaseClass<typename>" in the functions needing. "this->" seemed simplest solution
#ifndef CLIENT_H
#define CLIENT_H
#include <string>
#include <sstream>
#include <stdexcept>
#include <cstddef>
#include <vector>
#include <ctime>
#include <algorithm>
#include <utility>
#include <boost/lexical_cast.hpp>
#include <boost/asio.hpp>
#include <boost/tokenizer.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/bind.hpp>
#include "protocoltraits.h"
#include "connection.h"
#include "authentication.h"
#include "errorexception.h"
#include "extendedlengthpath.h"
#include "fio.h"
#include "compressmode.h"
#include "genrandnonce.h"
#include "listfile.h"
#include "finddelete.h"
#include "renamefinal.h"
#include "writemode.h"
#include "scopeguard.h"
#include "clientmajor.h"
#include "strutils.h"
#include "ResponseData.h"
#include "ClientCode.h"
#include "throttlingexception.h"
#include "Telemetry/TelemetrySharedParams.h"
bool const KEEP_ALIVE = true;
int const HEARTBEAT_INTERVAL_SECONDS(180);
typedef boost::tokenizer<boost::char_separator<char> > tokenizer_t;
typedef std::map<std::string, std::string> mapHeaders_t;
static const mapHeaders_t s_emptyHeader;
/// \brief abstract client for interacting with the cx process server
class ClientAbc {
public:
/// \brief various states the client can be in
enum ClientState {
CLIENT_STATE_NEEDS_TO_CONNECT, ///< client needs to issue a connect
CLIENT_STATE_NEEDS_TO_LOGIN, ///< client needs to issue a login request
CLIENT_STATE_IDLE, ///< client is connected, logged in, but not processing any requests
CLIENT_STATE_REQUEST_STARTED, ///< client has started a request
CLIENT_STATE_READING_REPLY, ///< client is reading the reply for a given request
CLIENT_STATE_MORE_DATA ///< client needs to read more data
};
typedef boost::shared_ptr<ClientAbc> ptr;
explicit ClientAbc(int writeMode = WRITE_MODE_NORMAL)
: m_writeMode(writeMode),
m_reqId(0)
{
resetResponseData();
}
virtual ~ClientAbc() {}
void resetResponseData()
{
m_responseData.data.clear();
m_responseData.headers.clear();
m_responseData.uriParams.clear();
// always set this as error so that if there is any exception thrown,
// it is treated as failure.
m_responseData.responseCode = CLIENT_RESULT_ERROR;
}
/// \brief abort an in progess request
virtual void abortRequest(bool disconnectOnly = false) = 0;
/// \brief issue put file request to send data to a remote file
///
/// \sa BasicClient::putFile for details
virtual void putFile(std::string const& remoteName,
std::size_t dataSize,
char const * data,
bool moreData,
COMPRESS_MODE compressMode,
bool createDirs = false,
long long offset = PROTOCOL_DO_NOT_SEND_OFFSET
) = 0;
/// \brief issue put file request to send data to a remote file
///
/// \sa BasicClient::putFile for details
virtual void putFile(std::string const& remoteName,
std::size_t dataSize,
char const * data,
bool moreData,
COMPRESS_MODE compressMode,
mapHeaders_t const & headers,
bool createDirs = false,
long long offset = PROTOCOL_DO_NOT_SEND_OFFSET
) = 0;
/// \brief issue put file request to send a local file to a remote file
///
/// \sa BasicClient::putFile for details
virtual void putFile(std::string const& remoteName,
std::string const& localName,
COMPRESS_MODE compressMode,
bool createDirs = false
) = 0;
/// \brief issue put file request to send a local file to a remote file
///
/// \sa BasicClient::putFile for details
virtual void putFile(std::string const& remoteName,
std::string const& localName,
COMPRESS_MODE compressMode,
mapHeaders_t const & headers,
bool createDirs = false
) = 0;
/// \brief issue rename file request
///
/// \sa BasicClient::renameFile for details
virtual ClientCode renameFile(std::string const& oldName,
std::string const& newName,
COMPRESS_MODE compressMode,
std::string const& finalPaths = std::string()) = 0;
/// \brief issue rename file request
///
/// \sa BasicClient::renameFile for details
virtual ClientCode renameFile(std::string const& oldName,
std::string const& newName,
COMPRESS_MODE compressMode,
mapHeaders_t const& headers,
std::string const& finalPaths = std::string()) = 0;
/// \brief issue delete file request
///
/// \sa BasicClient::deleteFile for details
virtual ClientCode deleteFile(std::string const& names, int mode = FindDelete::FILES_ONLY) = 0;
/// \brief issue delete file request
///
/// \sa BasicClient::deleteFile for details
virtual ClientCode deleteFile(std::string const& names, std::string const& fileSpece, int mode = FindDelete::FILES_ONLY) = 0;
/// \brief issue list file request
///
/// \sa BasicClient::listFile for details
virtual ClientCode listFile(std::string const& fileSpec,
std::string & files) = 0;
/// \brief issue get file request to get a remote file to a buffer
///
/// \sa BasicClient::getFile for details
virtual ClientCode getFile(std::string const& name,
std::size_t dataSize,
char * data,
std::size_t& bytesReturned) = 0;
/// \brief issue get file request to get remote file data in ragne from starting offset upto dataSize length to a buffer
///
/// \sa BasicClient::getFile for details
virtual ClientCode getFile(std::string const& name,
size_t offset,
std::size_t dataSize,
char * data,
std::size_t& bytesReturned) = 0;
/// \brief issue get file request to get a remote file to a local file
///
/// \sa BasicClient::getFile for details
virtual ClientCode getFile(std::string const& remoteName,
std::string const& localName) = 0;
/// \brief issue get file request to get a remote file to a local file with SHA256 checksum
///
/// \sa BasicClient::getFile for details
virtual ClientCode getFile(std::string const& remoteName,
std::string const& localName, std::string& checksum) = 0;
/// \brief issue heartbeat to keep a connection alive
///
/// \sa BasicClient::heartbeat for details
virtual ClientCode heartbeat(bool forceSend = false) = 0;
/// \brief gets the write mode to be used set writemode.h for possbile modes
int writeMode()
{
return m_writeMode;
}
/// \brief get the current reqId
///
/// \return boost::uint32_t the current reqId
boost::uint32_t reqId()
{
return m_reqId;
}
virtual std::string hostId() = 0;
virtual std::string ipAddress() = 0;
virtual std::string port() = 0;
virtual int timeoutSeconds() = 0;
/// \brief connects to cs does not require fingerprint to match but insteads returns it
virtual void csConnect(std::string& fingerprint, std::string& certificate) = 0;
virtual bool sendCsRequest(std::string const& request, std::string& response) = 0;
virtual std::string password() = 0;
virtual ResponseData & getResponseData()
{
return m_responseData;
}
virtual void setResponseData(ResponseData const& respData)
{
m_responseData = respData;
}
virtual void setResponseData(const ClientCode responseCode, const mapHeaders_t & uriParams, const mapHeaders_t & headers, const std::string & data = "")
{
m_responseData.data = data;
m_responseData.headers = headers;
m_responseData.responseCode = responseCode;
m_responseData.uriParams = uriParams;
}
protected:
/// \brief connects to the given endpoint
///
/// \note
/// \li \c 0 for a window size means use system default value
virtual void syncConnect(boost::asio::ip::tcp::endpoint const& endpoint, ///< peer to connect to
int sendWindowSizeBytes, ///< tcp send window size to use (overrides system setting)
int receiveWindowSizeBytes ///< tcp receive window size to use (overrides system setting)
) = 0;
/// \brief async connects to the given endpoint
///
/// \note
/// \li \c 0 for a window size means use system default value
virtual void asyncConnect(boost::asio::ip::tcp::endpoint const& endpoint, ///< peer to connect to
int sendWindowSizeBytes, ///< tcp send window size to use (overrides system setting)
int receiveWindowSizeBytes ///< tcp receive window size to use (overrides system setting)
) = 0;
/// \brief increments the current req id and returns it)
///
/// \return boost::uint32_t: the next req id
unsigned int nextReqId()
{
return ++m_reqId;
}
ResponseData m_responseData;
private:
bool m_writeMode; ///< write mode to use see writemode.h for possible modes
unsigned int m_reqId; ///< simple id that is incremented for each request made for a given Session.
};
/// \brief basic client for interacting with the cx process server
///
/// \note
/// \li \c all requests will automatically connect and login as needed
///
/// \sa protocoltraits.h for details about the types of traits that can be used for PROTOCOL_TRAITS
template <typename PROTOCOL_TRAITS>
class BasicClient : public ClientAbc {
public:
#define USE_DEFAULT_TIMEOUT_SECONDS -1
// NOTE: this is relatively small as it is use for
// for reading the first part of the http reply and
// parsing it, once that is done then the actual callers
// buffer is used to received any non http reply parts
// e.g. getfile data, listfile data.
#define READ_BUFFER_SIZE 2048
/// \brief simple POD used to track totall bytes sent
struct writeNInfo {
std::size_t m_bytesSent;
};
/// constructor
BasicClient(std::string const& ipAddress, ///< server ip for connection
std::string const& port, ///< port for connection
std::string const& hostId, ///< host id of the client
int maxBufferSizeBytes, ///< max buffer size for socket read/write
int connectTimeoutSeconds, ///< inactive duration before considering a connect attempt timed out
int timeoutSeconds, ///< inactive duration before considering a connection timed out
bool keepAlive, ///< should server keep connection alive true: yes, false: no
int sendWindowSizeBytes, ///< tcp send window size. 0 means uses system value
int receiveWindowSizeBytes, ///< tcp receive window size. 0 means uses system value
std::time_t heartbeatIntervalSeconds, ///< interval in seconds when a heartbeat should be sent if no other activity
int writeMode, ///< write mode to use see writemode.h for possible modes
std::string const& password, ///< connection passphrase. optional only used for dev to override reading from default location
bool useFxLogin = false ///< indicates if fx login should be used. true: yes, false: no default no
)
: ClientAbc(writeMode),
m_ipAddress(ipAddress),
m_port(port),
m_hostId(hostId),
m_password(password),
m_cnonce(securitylib::genRandNonce(32), true),
m_protocolHandler(HttpProtocolHandler::CLIENT_SIDE),
m_keepAlive(keepAlive),
m_state(CLIENT_STATE_NEEDS_TO_CONNECT),
m_dataSize(0),
m_bytesTransferred(0),
m_transferredBytesLeftToProcess(0),
m_dataBytesLeftToRead(0),
m_maxBufferSizeBytes(maxBufferSizeBytes),
m_buffer(READ_BUFFER_SIZE),
m_connectTimeoutSeconds(0 == connectTimeoutSeconds ? 30 : connectTimeoutSeconds),
m_timeoutSeconds(timeoutSeconds),
m_timer(m_ioService),
m_sendWindowSizeBytes(sendWindowSizeBytes),
m_receiveWindowSizeBytes(receiveWindowSizeBytes),
m_loggingOut(false),
m_lastConnectionActivity(time(0)),
m_heartbeatIntervalSeconds(heartbeatIntervalSeconds < timeoutSeconds ? heartbeatIntervalSeconds : timeoutSeconds / 2),
m_ioServiceRunning(false),
m_eof(false),
m_usingSocketTimeouts(false),
m_selfsignedMustMatch(true),
m_useFxLogin(useFxLogin),
m_useCertAuth(false)
{}
BasicClient(std::string const& ipAddress, ///< server ip for connection
std::string const& port, ///< port for connection
std::string const& hostId, ///< host id of the client
int maxBufferSizeBytes, ///< max buffer size for socket read/write
int connectTimeoutSeconds, ///< inactive duration before considering a connect attempt timed out
int timeoutSeconds, ///< inactive duration before considering a connection timed out
bool keepAlive, ///< should server keep connection alive true: yes, false: no
int sendWindowSizeBytes, ///< tcp send window size. 0 means uses system value
int receiveWindowSizeBytes, ///< tcp receive window size. 0 means uses system value
std::time_t heartbeatIntervalSeconds, ///< interval in seconds when a heartbeat should be sent if no other activity
int writeMode ///< write mode to use see writemode.h for possible modes
)
: ClientAbc(writeMode),
m_ipAddress(ipAddress),
m_port(port),
m_hostId(hostId),
m_cnonce(securitylib::genRandNonce(32), true),
m_protocolHandler(HttpProtocolHandler::CLIENT_SIDE),
m_keepAlive(keepAlive),
m_state(CLIENT_STATE_NEEDS_TO_CONNECT),
m_dataSize(0),
m_bytesTransferred(0),
m_transferredBytesLeftToProcess(0),
m_dataBytesLeftToRead(0),
m_maxBufferSizeBytes(maxBufferSizeBytes),
m_buffer(READ_BUFFER_SIZE),
m_connectTimeoutSeconds(0 == connectTimeoutSeconds ? 30 : connectTimeoutSeconds),
m_timeoutSeconds(timeoutSeconds),
m_timer(m_ioService),
m_sendWindowSizeBytes(sendWindowSizeBytes),
m_receiveWindowSizeBytes(receiveWindowSizeBytes),
m_loggingOut(false),
m_lastConnectionActivity(time(0)),
m_heartbeatIntervalSeconds(heartbeatIntervalSeconds < timeoutSeconds ? heartbeatIntervalSeconds : timeoutSeconds / 2),
m_ioServiceRunning(false),
m_eof(false),
m_usingSocketTimeouts(false),
m_selfsignedMustMatch(true),
m_useFxLogin(false),
m_useCertAuth(true)
{}
/// \brief destructor
virtual ~BasicClient()
{ }
/// \brief abort an in progess request
virtual void abortRequest(bool disconnectOnly = false) {
logout(disconnectOnly);
}
void writeN(ConnectionAbc::writeBuffer_t const& buffer)
{
if (usingSocketTimeouts()) {
syncWriteN(buffer);
}
else {
asyncWriteN(buffer);
}
}
void writeN(char const * buffer, std::size_t length)
{
if (usingSocketTimeouts()) {
syncWriteN(buffer, length);
}
else {
asyncWriteN(buffer, length);
}
}
void syncWriteN(ConnectionAbc::writeBuffer_t const& buffer)
{
try {
m_writeNInfo.m_bytesSent += connection()->writeN(buffer);
}
catch (std::exception const& e) {
networkErrorMsg(e.what());
m_writeNInfo.m_bytesSent = 0;
throw ERROR_EXCEPTION << "error sending data: " << networkErrorMsg();
}
}
void syncWriteN(char const * buffer, std::size_t length)
{
try {
m_writeNInfo.m_bytesSent += connection()->writeN(buffer, length);
}
catch (std::exception const& e) {
networkErrorMsg(e.what());
m_writeNInfo.m_bytesSent = 0;
throw ERROR_EXCEPTION << "error sending data: " << networkErrorMsg();
}
}
void asyncWriteN(ConnectionAbc::writeBuffer_t const& buffer)
{
m_writeNInfo.m_bytesSent = 0;
if (!connection()->isTimedOut()) {
connection()->asyncWriteN(buffer,
boost::bind(&BasicClient<PROTOCOL_TRAITS>::handleAsyncWriteN,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
runAsyncRequest(m_timer,
boost::bind(&BasicClient<PROTOCOL_TRAITS>::handleTimeout,
this,
boost::asio::placeholders::error));
if (connection()->isTimedOut()) {
throw ERROR_EXCEPTION << "timed out (" << m_timeoutSeconds << ')';
}
if (networkError()) {
throw ERROR_EXCEPTION << "error sending data: " << networkErrorMsg();
}
}
}
void asyncWriteN(char const * buffer, std::size_t length)
{
m_writeNInfo.m_bytesSent = 0;
while (m_writeNInfo.m_bytesSent < length) {
if (!connection()->isTimedOut()) {
connection()->asyncWriteN(buffer + m_writeNInfo.m_bytesSent,
length - m_writeNInfo.m_bytesSent,
boost::bind(&BasicClient<PROTOCOL_TRAITS>::handleAsyncWriteN,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
runAsyncRequest(m_timer,
boost::bind(&BasicClient<PROTOCOL_TRAITS>::handleTimeout,
this,
boost::asio::placeholders::error));
if (connection()->isTimedOut()) {
throw ERROR_EXCEPTION << "timed out (" << m_timeoutSeconds << ')';
}
if (networkError()) {
throw ERROR_EXCEPTION << "error sending data: " << networkErrorMsg();
}
}
else {
throw ERROR_EXCEPTION << "timed out (" << m_timeoutSeconds << ')';
}
}
}
void handleAsyncWriteN(boost::system::error_code const& error, ///< holds result of the read
size_t bytesTransferred) ///< hold number of bytes written)
{
m_timer.cancel();
m_writeNInfo.m_bytesSent += bytesTransferred;
if (error) {
try {
networkErrorMsg(boost::lexical_cast<std::string>(error));
networkErrorMsg(" ");
networkErrorMsg(error.message());
}
catch (...) {
networkErrorMsg("handleAsyncWriteN unknown error");
}
}
}
/// \brief issue put file request to send data to a remote file
///
/// use to send data to a file on the server. If the total size of the data to be sent
/// is not pre-calculated then multiple put file requests can be used to send the data.
///
/// \exception throws ERROR_EXCEPTION on failure
///
/// \note
/// \li \c if more then 1 put file request is needed to send all the data, then no other
/// requests can be sent until a put file request with moreData set to false is issued
/// \li \c if all the data has been sent before knowing there is no more data (i.e. a
/// putfile request with moreData set to false has not been sent), then 1 additional put
/// file request with moreData set to false still needs to be sent to let the server know
/// all the data has been sent. In this case set dataSize to 0 and data to NULL (0)
virtual void putFile(std::string const& remoteName, ///< name of remote file to put the data into
std::size_t dataSize, ///< size of the data being sent in this request
char const * data, ///< the data to put in the file
bool moreData, ///< if there is more data to be sent true: yes, false: no
COMPRESS_MODE compressMode, ///< compress mode (see volumegroupsettings.h)
mapHeaders_t const & headers, ///< additional headers to send in the putfile request
bool createDirs = false, ///< indicates if missing dirs should be created (true: yes, false: no)
long long offset = PROTOCOL_DO_NOT_SEND_OFFSET ///< offset to write at
)
{
try {
startRequest();
nextReqId();
std::string digest(Authentication::buildPutFileId(m_hostId,
m_password,
HTTP_METHOD_POST,
HTTP_REQUEST_PUTFILE,
m_cnonce,
m_sessionId,
m_snonce,
remoteName,
(moreData ? '1' : '0'),
REQUEST_VER_CURRENT,
reqId()));
std::string request;
m_protocolHandler.formatPutFileRequest(remoteName, dataSize, moreData, reqId(), digest, request, ipAddress(), compressMode, headers, createDirs, offset);
ConnectionAbc::writeBuffer_t buffer;
resetResponseData();
#if 1
writeN(request.c_str(), request.size());
if (0 != dataSize) {
writeN(data, dataSize);
}
#else
buffer.push_back(boost::asio::buffer(request));
if (0 != dataSize) {
buffer.push_back(boost::asio::buffer(data, dataSize));
}
writeN(buffer);
#endif
ClientCode result;
m_lastConnectionActivity = time(0);
if (!moreData) {
result = getReply();
readData();
endRequest();
}
else {
m_state = CLIENT_STATE_MORE_DATA;
}
}
catch (ThrottlingException te)
{
endRequest();
te << "(sid: " << m_sessionId << ')'
<< ", remoteName: " << remoteName
<< ", dataSize: " << dataSize
<< ", moreData: " << moreData
<< " (may want to check server side logs for this sid)";
throw te;
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ')'
<< ", remoteName: " << remoteName
<< ", dataSize: " << dataSize
<< ", moreData: " << moreData
<< ", error: " << e.what()
<< " (may want to check server side logs for this sid)";
}
}
virtual void putFile(std::string const& remoteName, ///< name of remote file to put the data into
std::size_t dataSize, ///< size of the data being sent in this request
char const * data, ///< the data to put in the file
bool moreData, ///< if there is more data to be sent true: yes, false: no
COMPRESS_MODE compressMode, ///< compress mode (see volumegroupsettings.h)
bool createDirs = false, ///< indicates if missing dirs should be created (true: yes, false: no)
long long offset = PROTOCOL_DO_NOT_SEND_OFFSET ///< offset to write at
)
{
putFile(remoteName, dataSize, data, moreData, compressMode, s_emptyHeader, createDirs, offset);
}
/// \brief issue put file request to send a local file to a remote file
///
/// use to send a local file to a remote file.
///
/// \exception throws ERROR_EXCEPTION on failure
///
/// \note
/// \li \c always uses binary mode
///
virtual void putFile(std::string const& remoteName, ///< name of remote file to put the data into
std::string const& localName, ///< name of local file to send
COMPRESS_MODE compressMode, ///< compress mode (see volumegroupsettings.h)
mapHeaders_t const & headers, ///< additional headers to send in putfile request
bool createDirs = false ///< indicates if missing dirs should be created (true: yes, false: no)
)
{
try {
FIO::Fio iFio(ExtendedLengthPath::name(localName).c_str(), FIO::FIO_READ_EXISTING);
// use same size used for reading to send from the connection
try {
std::vector<char> buffer(m_maxBufferSizeBytes);
long bytesRead = 0;
do {
bytesRead = iFio.read(&buffer[0], buffer.size());
if (bytesRead < 0) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ") error reading file " << localName << ": " << iFio.errorAsString();
}
else {
putFile(remoteName, bytesRead, &buffer[0], (iFio.eof() ? false : true), compressMode, headers, createDirs);
}
} while (!iFio.eof());
}
catch (std::exception const& e) {
throw;
}
}
catch (ThrottlingException & te)
{
te << "local file name: " << localName;
throw te;
}
catch (std::exception const& e) {
throw ERROR_EXCEPTION << "(sid: " << m_sessionId
<< ") open local file " << localName << " failed: "
<< e.what();
}
}
virtual void putFile(std::string const& remoteName, ///< name of remote file to put the data into
std::string const& localName, ///< name of local file to send
COMPRESS_MODE compressMode, ///< compress mode (see volumegroupsettings.h)
bool createDirs = false ///< indicates if missing dirs should be created (true: yes, false: no)
)
{
putFile(remoteName, localName, compressMode, s_emptyHeader, createDirs);
}
/// \brief issue list file request
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: no matches found
///
/// \exception ERROR_EXCEPTION on failure
virtual ClientCode listFile(std::string const& fileSpec, ///< file specification to list. Use stnadard glob syntax
std::string & files) ///< receives the list of files each separated by new-line (\\n)
{
try {
startRequest();
nextReqId();
std::string digest(Authentication::buildListFileId(m_hostId,
m_password,
HTTP_METHOD_GET,
HTTP_REQUEST_LISTFILE,
m_cnonce,
m_sessionId,
m_snonce,
fileSpec,
REQUEST_VER_CURRENT,
reqId()));
std::string request;
m_protocolHandler.formatListFileRequest(fileSpec, reqId(), digest, request, ipAddress());
writeN(request.data(), request.size());
m_lastConnectionActivity = time(0);
getReply();
readData(files);
endRequest();
return resultCode();
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ')'
<< " fileSpec: " << fileSpec
<< ' ' << e.what();
}
}
/// \brief issue rename file request
///
/// \note
/// \li \c finalPaths should be used to tell transport it should perform the "final" rename instead of time shot manager
/// it should be a semi-conlon (';') separated list of the final paths that the file should get a "copy" of the new file.
/// hard links are used to create the "copies" unless transport server was built with RENAME_COPY_ON_FAILED_LINK defined.
/// in that case transport will attempt to copy the file if the hard link fails.
///
/// \exception ERROR_EXCEPTION on failure
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: oldName not found
virtual ClientCode renameFile(std::string const& oldName, ///< the name of the file that is to be renamed
std::string const& newName, ///< the new name to use
COMPRESS_MODE compressMode, ///< compress mode in affect (see compressmode.h)
mapHeaders_t const& headers, ///< additional headers to be sent in renamefile request
std::string const& finalPaths = std::string() ///< semi-colon (';') separated list of all paths that should get a "copy" of the renamed file
)
{
try {
startRequest();
nextReqId();
std::string digest(Authentication::buildRenameFileId(m_hostId,
m_password,
HTTP_METHOD_GET,
HTTP_REQUEST_RENAMEFILE,
m_cnonce,
m_sessionId,
m_snonce,
oldName,
newName,
REQUEST_VER_CURRENT,
reqId()));
std::string request;
m_protocolHandler.formatRenameFileRequest(oldName, newName, compressMode, reqId(), digest, request, ipAddress(), headers, finalPaths);
writeN(request.data(), request.size());
m_lastConnectionActivity = time(0);
getReply();
readData();
endRequest();
return resultCode();
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ')'
<< " oldName: " << oldName
<< " newName: " << newName
<< ' ' << e.what();
}
}
/// \brief issue rename file request
///
/// \note
/// \li \c finalPaths should be used to tell transport it should perform the "final" rename instead of time shot manager
/// it should be a semi-conlon (';') separated list of the final paths that the file should get a "copy" of the new file.
/// hard links are used to create the "copies" unless transport server was built with RENAME_COPY_ON_FAILED_LINK defined.
/// in that case transport will attempt to copy the file if the hard link fails.
///
/// \exception ERROR_EXCEPTION on failure
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: oldName not found
virtual ClientCode renameFile(std::string const& oldName, ///< the name of the file that is to be renamed
std::string const& newName, ///< the new name to use
COMPRESS_MODE compressMode, ///< compress mode in affect (see compressmode.h)
std::string const& finalPaths = std::string() ///< semi-colon (';') separated list of all paths that should get a "copy" of the renamed file
)
{
return renameFile(oldName, newName, compressMode, s_emptyHeader, finalPaths);
}
/// \brief issue get file request to get a remote file to a buffer
///
/// \note
/// \li \c if CLIENT_RESULT_MORE_DATA is returned, you must copy all the data returned
/// in "data" before making the next call as "data" will be overwritten on each call
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success and no more data to receive
/// \li \c CLIENT_RESULT_MORE_DATA: on success and more data to reveive
/// \li \c CLIENT_RESULT_NOT_FOUND: remote file not found
///
/// \exception ERROR_EXCEPTION on failure
virtual ClientCode getFile(std::string const& name, ///< file to get
std::size_t dataSize, ///< size of the buffer pointed to by data
char * data, ///< points to the get file data returned
std::size_t& bytesReturned) ///< set the the number of bytes returned in data
{
try {
bytesReturned = 0;
startRequest();
nextReqId();
if (CLIENT_STATE_REQUEST_STARTED == m_state) {
std::string digest(Authentication::buildGetFileId(m_hostId,
m_password,
HTTP_METHOD_GET,
HTTP_REQUEST_GETFILE,
m_cnonce,
m_sessionId,
m_snonce,
name,
REQUEST_VER_CURRENT,
reqId()));
std::string request;
m_protocolHandler.formatGetFileRequest(name, reqId(), digest, request, ipAddress());
writeN(request.data(), request.size());
m_lastConnectionActivity = time(0);
getReply();
}
bytesReturned = readDataN(data, dataSize);
if (CLIENT_STATE_MORE_DATA != m_state) {
endRequest();
}
return resultCode();
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ')'
<< " getName: " << name
<< " dataSize: " << dataSize
<< ' ' << e.what();
}
}
/// \brief issue get file request to get remote file data in ragne from starting offset upto dataSize length to a buffer
///
/// \note
/// \li \c if CLIENT_RESULT_MORE_DATA is returned, you must copy all the data returned
/// in "data" before making the next call as "data" will be overwritten on each call
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success and no more data to receive
/// \li \c CLIENT_RESULT_MORE_DATA: on success and more data to reveive
/// \li \c CLIENT_RESULT_NOT_FOUND: remote file not found
///
/// \exception ERROR_EXCEPTION on failure
virtual ClientCode getFile(std::string const& name,
size_t offset,
std::size_t dataSize,
char * data,
std::size_t& bytesReturned)
{
return getFile(name, dataSize, data, bytesReturned);
}
/// \brief issue get file request to get a remote file to a local file
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: remote file not found
///
/// \exception ERROR_EXCEPTION on failure
///
/// \note
/// \li \c always uses binary mode
///
virtual ClientCode getFile(std::string const& remoteName, ///< remote file to get
std::string const& localName) ///< local file to put data into
{
try {
ClientCode rc;
size_t bytesReturned;
FIO::Fio oFio(ExtendedLengthPath::name(localName).c_str(), FIO::FIO_OVERWRITE);
std::vector<char> buffer(m_maxBufferSizeBytes);
do {
rc = getFile(remoteName, buffer.size(), &buffer[0], bytesReturned);
if (bytesReturned > 0) {
if (oFio.write(&buffer[0], bytesReturned) < 0) {
throw ERROR_EXCEPTION << "error writing data to local file: " << oFio.errorAsString();
}
}
} while (CLIENT_RESULT_MORE_DATA == rc);
if (WRITE_MODE_FLUSH == writeMode() && !oFio.flushToDisk()) {
throw ERROR_EXCEPTION << "error flushing data to disk: " << oFio.errorAsString();
}
return rc;
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ')'
<< " remoteName: " << remoteName
<< " localName: " << localName
<< ' ' << e.what();
}
}
/// \brief issue get file request to get a remote file to a local file with SHA256 checksum
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: remote file not found
///
/// \exception ERROR_EXCEPTION on failure
///
/// \note
/// \li \c always uses binary mode
///
virtual ClientCode getFile(std::string const& remoteName, ///< remote file to get
std::string const& localName, ///< local file to put data into
std::string& checksum) ///< buffer to return checksum
{
try {
ClientCode rc;
size_t bytesReturned;
FIO::Fio oFio(ExtendedLengthPath::name(localName).c_str(), FIO::FIO_OVERWRITE);
std::vector<char> buffer(m_maxBufferSizeBytes);
unsigned char mac[SHA256_DIGEST_LENGTH];
SHA256_CTX sha256;
SHA256_Init(&sha256);
do {
rc = getFile(remoteName, buffer.size(), &buffer[0], bytesReturned);
if (bytesReturned > 0) {
if (oFio.write(&buffer[0], bytesReturned) < 0) {
throw ERROR_EXCEPTION << "error writing data to local file: " << oFio.errorAsString();
}
SHA256_Update(&sha256, &buffer[0], bytesReturned);
}
} while (CLIENT_RESULT_MORE_DATA == rc);
if (WRITE_MODE_FLUSH == writeMode() && !oFio.flushToDisk()) {
throw ERROR_EXCEPTION << "error flushing data to disk: " << oFio.errorAsString();
}
SHA256_Final(mac, &sha256);
std::stringstream ss;
for (int i = 0; i < SHA256_DIGEST_LENGTH; ++i) {
ss << std::hex << std::setfill('0') << std::setw(2) << (int)mac[i];
}
checksum = ss.str();
return rc;
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ')'
<< " remoteName: " << remoteName
<< " localName: " << localName
<< ' ' << e.what();
}
}
/// \brief issue delete file request
///
/// see documenation for FindDelete for complete details
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: remote file not found
/// \exception ERROR_EXCEPTION on failure
virtual ClientCode deleteFile(std::string const& names, ///< semi-colon seprated list of files and/or dirs to delete
int mode = FindDelete::FILES_ONLY ///< delete mode to use (FILES_ONLY, RECURSE_DIRS, or can combine both using logical-or ('|'))
)
{
return deleteFile(names, std::string(), mode);
}
/// \brief issue delete file request
///
/// see documenation for FindDelete for complete details
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: remote file not found
/// \exception ERROR_EXCEPTION on failure
virtual ClientCode deleteFile(std::string const& names, ///< semi-colon seprated list of files and/or dirs to delete
std::string const& fileSpec, ///< file spec to use to match file/dir names when name in names is a dir
int mode = FindDelete::FILES_ONLY ///< delete mode to use (FILES_ONLY, RECURSE_DIRS, or can combine both using logical-or ('|'))
)
{
try {
startRequest();
nextReqId();
std::string digest(Authentication::buildDeleteFileId(m_hostId,
m_password,
HTTP_METHOD_GET,
HTTP_REQUEST_DELETEFILE,
m_cnonce,
m_sessionId,
m_snonce,
names,
fileSpec,
REQUEST_VER_CURRENT,
reqId()));
std::string request;
m_protocolHandler.formatDeleteFileRequest(names, fileSpec, mode, reqId(), digest, request, ipAddress());
writeN(request.data(), request.size());
m_lastConnectionActivity = time(0);
getReply();
readData();
endRequest();
return resultCode();
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ')'
<< " names: " << names
<< " fileSpec: " << fileSpec
<< " mode: 0x" << std::hex << mode << std::dec
<< ' ' << e.what();
}
}
/// \brief issue heartbeat to keep connection alive
///
/// \param forceSend determines if heartbeat should be sent even if duration has not expired
/// true: yes false: no (default)
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \exception ERROR_EXCEPTION on failure
virtual ClientCode heartbeat(bool forceSend = false)
{
if (!forceSend && (time(0) - m_lastConnectionActivity) < m_heartbeatIntervalSeconds) {
return CLIENT_RESULT_OK;
}
try {
startRequest();
nextReqId();
std::string digest(Authentication::buildHeartbeatId(m_hostId,
m_password,
HTTP_METHOD_GET,
HTTP_REQUEST_HEARTBEAT,
m_cnonce,
m_sessionId,
m_snonce,
REQUEST_VER_CURRENT,
reqId()));
std::string request;
m_protocolHandler.formatHeartbeatRequest(reqId(), digest, request, ipAddress());
writeN(request.data(), request.size());
m_lastConnectionActivity = time(0);
getReply();
readData();
endRequest();
return resultCode();
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ')'
<< ' ' << e.what();
}
}
virtual std::string hostId()
{
return m_hostId;
}
virtual std::string ipAddress()
{
return m_connectedAddress;
}
virtual std::string port()
{
return m_port;
}
virtual int timeoutSeconds()
{
return m_timeoutSeconds;
}
/// \brief connects to cs does not require fingerprint to match but insteads returns it
virtual void csConnect(std::string& fingerprint, std::string& certificate)
{
m_selfsignedMustMatch = false;
connectSocket();
if (connection()->usingSsl()) {
fingerprint = dynamic_cast<SslConnection*>(connection().get())->getFingerprint();
certificate = dynamic_cast<SslConnection*>(connection().get())->getCertificate();
}
}
virtual bool sendCsRequest(std::string const& request, std::string& response)
{
ON_BLOCK_EXIT(boost::bind(&BasicClient::csDisconnect, this));
if (!connection()->isOpen()) {
connectSocket();
}
writeN(request.data(), request.size());
return readCsRequestResponse(response);
}
virtual void csDisconnect()
{
m_selfsignedMustMatch = true;
connection()->disconnect();
}
virtual std::string password()
{
return m_password;
}
protected:
/// \brief resets internal state so the client can be reused
void reset() {
m_dataSize = 0;
m_bytesTransferred = 0;
m_transferredBytesLeftToProcess = 0;
m_dataBytesLeftToRead = 0;
m_lastConnectionActivity = time(0);
m_eof = false;
m_networkError.clear();
m_writeNInfo.m_bytesSent = 0;
m_loggingOut = false;
m_snonce.clear();
connection()->clearTimedOut();
m_eof = false;
}
/// \brief get a pointer to the connection object
///
/// \return
/// \li \c pointer to the connection object used by this client
/// \see ConnectionAbc::ptr details on the pointer type
virtual ConnectionAbc::ptr connection() = 0;
/// \brief get the io service used by the client
boost::asio::io_service& ioService() {
return m_ioService;
}
/// \brief get the current client state
ClientState state() {
return m_state;
}
void login()
{
login(m_useFxLogin ? HTTP_REQUEST_FXLOGIN : HTTP_REQUEST_LOGIN);
}
/// \brief logins into the cx process server
///
/// \exception throws ERROR_EXCEPTION on failure
void login(char const* requestName) {
try {
std::string fingerprint;
if (connection()->usingSsl()) {
fingerprint = dynamic_cast<SslConnection*>(connection().get())->getFingerprint();
}
m_cnonce = securitylib::genRandNonce(32, true); // each login gets a new nonce to prevent replay of login
std::string digest(Authentication::buildLoginId(m_hostId,
fingerprint,
m_password,
HTTP_METHOD_GET,
requestName,
m_cnonce,
REQUEST_VER_CURRENT));
std::string request;
m_protocolHandler.formatLoginRequest(requestName, m_cnonce, m_hostId, digest, request, ipAddress());
writeN(request.data(), request.size());
m_lastConnectionActivity = time(0);
getReply();
std::string loginResponse;
readData(loginResponse);
verifyLoginResponse(loginResponse, fingerprint, requestName);
}
catch (std::exception const& e) {
abortRequest(true);
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ") " << e.what();
}
}
/// \brief log out from cx process server
void logout(bool disconnectOnly = false)
{
if (!m_loggingOut) {
m_loggingOut = true;
if (!disconnectOnly) {
try {
if (CLIENT_STATE_NEEDS_TO_CONNECT != m_state
&& !networkError()
&& !connection()->isTimedOut()
) {
nextReqId();
std::string digest(Authentication::buildLogoutId(m_hostId,
m_password,
HTTP_METHOD_GET,
HTTP_REQUEST_LOGOUT,
m_cnonce,
m_sessionId,
m_snonce,
REQUEST_VER_CURRENT,
reqId()));
std::string request;
m_protocolHandler.formatLogoutRequest(reqId(), digest, request, ipAddress());
writeN(request.data(), request.size());
m_lastConnectionActivity = time(0);
getReply();
readData();
}
}
catch (std::exception const& e) {
(e);
}
}
disconnect();
reset();
}
}
/// \brief verifies that it knows the server it logged into
///
/// \exception throws ERROR_EXCEPTION on failure
void verifyLoginResponse(std::string const& response, ///< holds the servers response to login request
std::string const& fingerprint, ///< server certiricate fingerprint
char const* requestName) ///< the actual login request sent
{
// login response should be
// snonce=snonce&sessionid=sessionId&id=digest
typedef boost::tokenizer<boost::char_separator<char> > tokenizer_t;
boost::char_separator<char> sep("=&");
tokenizer_t tokens(response, sep);
tokenizer_t::iterator iter(tokens.begin());
tokenizer_t::iterator iterEnd(tokens.end());
if (iter == iterEnd) {
throw ERROR_EXCEPTION << "login response invalid";
}
std::string tag;
// get snonce=snonce
tag = *iter;
boost::trim(tag);
if (tag != HTTP_PARAM_TAG_SERVER_NONCE) {
throw ERROR_EXCEPTION << "missing login response snonce";
}
++iter;
if (iter == iterEnd) {
throw ERROR_EXCEPTION << "login response invalid";
}
m_snonce = *iter;
boost::trim(m_snonce);
++iter;
if (iter == iterEnd) {
throw ERROR_EXCEPTION << "login response invalid";
}
// get sessionid=sessionid
tag = *iter;
boost::trim(tag);
if (tag != HTTP_PARAM_TAG_SESSIONID) {
throw ERROR_EXCEPTION << "missing login response sessionid";
}
++iter;
if (iter == iterEnd) {
throw ERROR_EXCEPTION << "login response invalid";
}
m_sessionId = *iter;
boost::trim(m_sessionId);
++iter;
if (iter == iterEnd) {
throw ERROR_EXCEPTION << "login response invalid";
}
// get id=digest
tag = *iter;
boost::trim(tag);
if (tag != HTTP_PARAM_TAG_ID) {
throw ERROR_EXCEPTION << "missing login response id";
}
++iter;
if (iter == iterEnd) {
throw ERROR_EXCEPTION << "login response invalid";
}
std::string digest(*iter);
boost::trim(digest);
if (!Authentication::verifyLoginResponseId(m_hostId,
fingerprint,
m_password,
HTTP_METHOD_GET,
requestName,
m_cnonce,
m_sessionId,
m_snonce,
digest)) {
throw ERROR_EXCEPTION << "missing login response id failed validation";
}
++iter;
if (iter != iterEnd) {
throw ERROR_EXCEPTION << "login response invalid";
}
}
/// \brief does clean up after a request ends
void endRequest() {
m_dataSize = 0;
m_bytesTransferred = 0;
m_transferredBytesLeftToProcess = 0;
m_dataBytesLeftToRead = 0;
m_state = CLIENT_STATE_IDLE;
if (!m_keepAlive) {
logout();
}
}
/// \brief starts a request
///
/// will connect if needed
/// \exception throws ERROR_EXCEPTION on failure
void startRequest() {
switch (m_state) {
case CLIENT_STATE_NEEDS_TO_CONNECT:
connect();
m_state = CLIENT_STATE_REQUEST_STARTED;
break;
case CLIENT_STATE_IDLE:
m_state = CLIENT_STATE_REQUEST_STARTED;
break;
case CLIENT_STATE_MORE_DATA:
break;
default:
throw ERROR_EXCEPTION << "request in progess, you must either finish or abort that request before starting a new request using this client object";
}
}
/// \brief disconnects from the cx process server
virtual void disconnect() {
try {
connection()->disconnect();
}
catch (...) {
// nothing to do
// just preventing exceptions from being thrown
// as this can be called in an arbitrary thread
}
m_state = CLIENT_STATE_NEEDS_TO_CONNECT;
}
/// \brief connects and logs into the cx process server
///
/// \exception throws ERROR_EXCEPTION on failure
void connect() {
connectSocket();
m_state = CLIENT_STATE_IDLE;
login();
}
/// \brief sets socket timeout if supported
void setSocketTimeouts()
{
if (connection()->setSocketTimeouts(m_timeoutSeconds * 1000)) {
m_usingSocketTimeouts = true;
}
else{
// MAYBE: remove this if proven not to help or recvmsg issue is resolved in a different way
// even though using async requests, still set socket timeout if supported
// as this may help with some edge cases that causes recvmsg to not respond
setSocketTimeoutForAsyncRequests(connection()->lowestLayerSocket().native_handle(), m_timeoutSeconds * 1000);
}
}
/// \brief connects to the given endpoint
///
/// before connecting sets the provided window sizes on the socket as that needs to be done
/// before connection
///
/// \note
/// \li \c 0 for a window size means use system default value
/// \exception throws ERROR_EXCEPTION on failure
virtual void connectSocket()
{
m_connectedAddress.clear();
std::string innerExceptionMsg;
std::vector<std::string> addresses;
boost::split(addresses, m_ipAddress, boost::is_any_of(","));
std::vector<std::string>::const_iterator addrIter = addresses.begin();
for (/**/; m_connectedAddress.empty() && addrIter != addresses.end(); addrIter++)
{
boost::regex ipv4pattern("(\\d{1,3}(\\.\\d{1,3}){3})");
boost::smatch matches;
if (boost::regex_search(*addrIter, matches, ipv4pattern))
{
// boost.asio wants host byte order, inet_addr returns network byte order
unsigned short port = boost::lexical_cast<unsigned short>(m_port);
boost::asio::ip::address_v4 address(ntohl(inet_addr(addrIter->c_str())));
boost::asio::ip::tcp::endpoint endpoint(address, (unsigned short)port);
m_connectedAddress = *addrIter;
connectSocketEndpoint(endpoint, innerExceptionMsg);
}
else
{
boost::system::error_code ec;
boost::asio::ip::tcp::resolver resolver(m_ioService);
boost::asio::ip::tcp::resolver::query query(addrIter->c_str(), m_port.c_str());
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query, ec);
if (ec)
{
innerExceptionMsg += *addrIter + " : "
+ boost::lexical_cast<std::string>(ec.value()) +", " + ec.message() + ". ";
continue;
}
for (/*empty*/;
m_connectedAddress.empty() &&
iterator != boost::asio::ip::tcp::resolver::iterator();
iterator++)
{
if (!iterator->endpoint().address().is_v4())
continue;
boost::asio::ip::tcp::endpoint endpoint(iterator->endpoint());
m_connectedAddress = iterator->endpoint().address().to_string();
connectSocketEndpoint(endpoint, innerExceptionMsg);
}
}
}
if (m_connectedAddress.empty())
{
throw ERROR_EXCEPTION << innerExceptionMsg;
}
}
void connectSocketEndpoint(boost::asio::ip::tcp::endpoint& endpoint, std::string& errMsg)
{
try {
connection()->socketOpen(endpoint, m_sendWindowSizeBytes, m_receiveWindowSizeBytes);
setSocketTimeouts();
if (usingSocketTimeouts()) {
syncConnect(endpoint, m_sendWindowSizeBytes, m_receiveWindowSizeBytes);
}
else {
asyncConnect(endpoint, m_sendWindowSizeBytes, m_receiveWindowSizeBytes);
}
}
catch (const std::exception& e)
{
// there aren't any options to log this connection failure, just proceed
errMsg += m_connectedAddress + " : " + e.what() + ". ";
m_connectedAddress.clear();
}
}
/// \brief reads all returned data and throws it away
///
/// blocks until all remaining data is read, error is encountered, or read times out
/// \exception throws ERROR_EXCEPTION on failure
void readData() {
if (m_dataBytesLeftToRead > 0) {
if (m_transferredBytesLeftToProcess > 0) {
m_dataBytesLeftToRead -= m_transferredBytesLeftToProcess;
m_transferredBytesLeftToProcess = 0;
}
while (m_dataBytesLeftToRead > 0 && !m_eof) {
readSome(&m_buffer[0], m_buffer.size());
m_dataBytesLeftToRead -= m_bytesTransferred;
m_transferredBytesLeftToProcess = 0;
}
if (m_dataBytesLeftToRead > 0 && m_eof) {
throw ERROR_EXCEPTION << "socket returend EOF with " << m_dataBytesLeftToRead << " bytes left to read";
}
}
m_state = CLIENT_STATE_IDLE;
}
/// \brief reads all returned data and copies it into data
///
/// blocks until all remaining data is read, error is encountered, or read times out
/// \exception throws ERROR_EXCEPTION on failure
void readData(std::string & data) ///< string to receive the read data
{
data.clear();
if (m_dataBytesLeftToRead > 0) {
if (m_transferredBytesLeftToProcess > 0) {
data.append(&m_buffer[m_bytesTransferred - m_transferredBytesLeftToProcess],
m_transferredBytesLeftToProcess);
m_dataBytesLeftToRead -= m_transferredBytesLeftToProcess;
m_transferredBytesLeftToProcess = 0;
}
std::vector<char> buffer(m_maxBufferSizeBytes);
while (m_dataBytesLeftToRead > 0 && !m_eof) {
readSome(&buffer[0], buffer.size());
m_dataBytesLeftToRead -= m_bytesTransferred;
data.append(&buffer[0], m_bytesTransferred);
m_transferredBytesLeftToProcess = 0;
}
if (m_dataBytesLeftToRead > 0 && m_eof) {
throw ERROR_EXCEPTION << "socket returend EOF with " << m_dataBytesLeftToRead << " bytes left to read";
}
}
m_state = CLIENT_STATE_IDLE;
}
/// \brief reads dataSize bytes
///
/// blocks until dataSize bytes read, error, or read times out if time out set
/// (if time out not set could block for ever). Can read less then dataSize on EOF
///
/// \returns
/// \li \c number of bytes read (0 indicates socket eof reached and no bytes read)
std::size_t readDataN(char * data, ///< points to buffer to receive read data
std::size_t dataSize) ///< size of buffer pointed to by data
{
std::size_t bytesLeft = dataSize;
while (bytesLeft > 0) {
std::size_t bytesRead = readData(data + (dataSize - bytesLeft), bytesLeft);
if (0 == bytesRead) {
return dataSize - bytesLeft;
}
bytesLeft -= bytesRead;
}
return dataSize;
}
/// \brief reads returned data up to dataSize and places into data
///
/// blocks until at least 1 byte is read, an error is encountered, or read times out
/// \returns
/// \li \c number of bytes read (0 indicates socket eof reached)
/// \exception throws ERROR_EXCEPTION on failure
std::size_t readData(char * data, ///< points to buffer to receive read data
std::size_t dataSize) ///< size of buffer pointed to by data
{
if (0 == m_dataBytesLeftToRead) {
m_state = CLIENT_STATE_IDLE;
return 0;
}
m_state = CLIENT_STATE_MORE_DATA;
if (m_transferredBytesLeftToProcess > 0) {
// make sure copy the lesser of the size of the destination buffer or the bytes left to process
// that way will not overflow destination as well as not copy more then the bytes left
std::size_t bytesToRead = (dataSize < m_transferredBytesLeftToProcess ?
dataSize : m_transferredBytesLeftToProcess);
// make sure to start copying from the first byte that has not yet been processed
// and only for the byteesToRead (start idx + bytesToRead)
std::copy(m_buffer.begin() + (m_bytesTransferred - m_transferredBytesLeftToProcess),
m_buffer.begin() + (m_bytesTransferred - m_transferredBytesLeftToProcess + bytesToRead),
data);
m_dataBytesLeftToRead -= bytesToRead;
m_transferredBytesLeftToProcess -= bytesToRead;
if (0 == m_dataBytesLeftToRead && 0 == m_transferredBytesLeftToProcess) {
m_state = CLIENT_STATE_IDLE;
}
return bytesToRead;
}
readSome(data, dataSize);
m_dataBytesLeftToRead -= m_bytesTransferred;
m_transferredBytesLeftToProcess = 0;
if (m_dataBytesLeftToRead > 0 && m_eof) {
throw ERROR_EXCEPTION << "socket returend EOF with " << m_dataBytesLeftToRead << " bytes left to read";
}
if (0 == m_dataBytesLeftToRead) {
m_state = CLIENT_STATE_IDLE;
}
return m_bytesTransferred;
}
/// \brief gets the reply from cx process server
///
/// \exception throws ERROR_EXCEPTION on failure
ClientCode getReply() {
m_eof = false;
m_state = CLIENT_STATE_READING_REPLY;
m_protocolHandler.reset();
readSome(&m_buffer[0], m_buffer.size());
int result = HttpProtocolHandler::PROTOCOL_NEED_MORE_DATA;
while (result == HttpProtocolHandler::PROTOCOL_NEED_MORE_DATA && !m_eof) {
if (0 == m_bytesTransferred) {
if (connection()->isTimedOut()) {
throw ERROR_EXCEPTION << "timed out (" << m_timeoutSeconds << ')';
}
else {
throw ERROR_EXCEPTION << "expecting more data, but got eof";
}
}
result = m_protocolHandler.process(m_transferredBytesLeftToProcess, &m_buffer[0]);
switch (result) {
case HttpProtocolHandler::PROTOCOL_ERROR:
break;
case HttpProtocolHandler::PROTOCOL_COMPLETE:
break;
case HttpProtocolHandler::PROTOCOL_HAVE_REQUEST:
break;
case HttpProtocolHandler::PROTOCOL_NEED_MORE_DATA:
readSome(&m_buffer[0], m_buffer.size());
break;
default:
throw ERROR_EXCEPTION << "unexpected return value from protocol handler: " << result;
break;
}
}
if (result == HttpProtocolHandler::PROTOCOL_HAVE_REQUEST) {
m_state = CLIENT_STATE_MORE_DATA;
m_dataSize = m_protocolHandler.dataSize();
m_dataBytesLeftToRead = m_dataSize;
}
// will throw if there were errors, otherwise do not care about the result
return resultCode();
}
typedef boost::function<void(boost::system::error_code const & error)> timeoutHandler_t;
void runAsyncRequest(boost::asio::deadline_timer& timeoutTimer,
timeoutHandler_t timeoutHandler,
int timeoutSeconds = USE_DEFAULT_TIMEOUT_SECONDS)
{
try {
if (!m_ioServiceRunning) {
m_ioServiceRunning = true;
timeoutTimer.expires_from_now(boost::posix_time::seconds(USE_DEFAULT_TIMEOUT_SECONDS == timeoutSeconds ? m_timeoutSeconds : timeoutSeconds));
timeoutTimer.async_wait(timeoutHandler);
m_ioService.reset();
m_ioService.run();
m_ioServiceRunning = false;
}
}
catch (std::exception const& e) {
m_ioServiceRunning = false;
networkErrorMsg(e.what());
throw ERROR_EXCEPTION << e.what();
}
}
void readSome(char* buffer, ///< points to bufer to recevie read data
std::size_t size) ///< size of buffer pointed to by buffer
{
if (usingSocketTimeouts()) {
syncReadSome(buffer, size);
}
else {
asyncReadSome(buffer, size);
}
m_lastConnectionActivity = time(0);
}
void syncReadSome(char* buffer, ///< points to bufer to recevie read data
std::size_t size) ///< size of buffer pointed to by buffer
{
try {
m_bytesTransferred = connection()->readSome(buffer, size);
m_eof = connection()->eof();
m_transferredBytesLeftToProcess = m_bytesTransferred;
}
catch (std::exception const& e) {
networkErrorMsg(e.what());
m_transferredBytesLeftToProcess = 0;
throw ERROR_EXCEPTION << "read data from socket failed: " << networkErrorMsg();
}
}
/// \brief reads some data from the socket
///
/// blocks until at least 1 byte is read, an error is encountered, or read times out
///
/// \returns
/// \li \c number of bytes read (0 indicates socket eof reached)
/// \exception throws ERROR_EXCEPTION on failure
void asyncReadSome(char* buffer, ///< points to bufer to recevie read data
std::size_t size) ///< size of buffer pointed to by buffer
{
if (!connection()->isTimedOut()) {
connection()->asyncReadSome(buffer, size,
boost::bind(&BasicClient<PROTOCOL_TRAITS>::handleAsyncRead,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
runAsyncRequest(m_timer,
boost::bind(&BasicClient<PROTOCOL_TRAITS>::handleTimeout,
this,
boost::asio::placeholders::error));
if (connection()->isTimedOut()) {
throw ERROR_EXCEPTION << "timed out (" << m_timeoutSeconds << ')';
}
if (networkError()) {
throw ERROR_EXCEPTION << "read data from socket failed: " << networkErrorMsg();
}
}
else {
throw ERROR_EXCEPTION << "timed out (" << m_timeoutSeconds << ')';
}
}
/// \brief callback for processing data read by asyncReadSome
///
/// \exception throws ERROR_EXCEPTION on failure
void handleAsyncRead(boost::system::error_code const& error, ///< holds result of the read
size_t bytesTransferred) ///< hold number of bytes read
{
try {
m_timer.cancel();
m_bytesTransferred = bytesTransferred;
if (!connection()->isTimedOut()) {
if (!error) {
m_transferredBytesLeftToProcess = m_bytesTransferred;
}
else {
if (boost::asio::error::eof != error) {
try {
networkErrorMsg(boost::lexical_cast<std::string>(error));
networkErrorMsg(" ");
networkErrorMsg(error.message());
}
catch (...) {
networkErrorMsg("handleAsyncRead unknown error");
}
}
m_eof = true; // alwasy set to eof even if not eof to prevent more read attempts
}
}
}
catch (std::exception const & e) {
if (!connection()->isTimedOut()) {
networkErrorMsg(e.what());
}
}
catch (...) {
if (!connection()->isTimedOut()) {
networkErrorMsg("unknown error");
}
}
}
/// \brief reads data from the socket until delimitor is found or error or timeout expires
///
/// if socket times our being used, will read until delimitor found, error or timeout expires,
/// otherwise will pwerform async read until
///
/// \exception throws ERROR_EXCEPTION on failure
void readUntil(boost::asio::streambuf& buffer, ///< points to bufer to recevie read data
char const* delimitor) ///< delinitor to read until
{
if (usingSocketTimeouts()) {
connection()->readUntil(buffer, "\r\n");
}
else {
asyncReadUntil(buffer, delimitor);
}
m_lastConnectionActivity = time(0);
}
/// \brief async reads data from the socket until delimitor is found or error or timeout expires
///
/// blocks until at least 1 byte is read, an error is encountered, or read times out
///
/// \exception throws ERROR_EXCEPTION on failure
void asyncReadUntil(boost::asio::streambuf& buffer, ///< points to bufer to recevie read data
char const* delimitor) ///< delinitor to read until
{
if (!connection()->isTimedOut()) {
connection()->asyncReadUntil(buffer,
delimitor,
boost::bind(&BasicClient<PROTOCOL_TRAITS>::handleAsyncReadUntil,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
runAsyncRequest(m_timer,
boost::bind(&BasicClient<PROTOCOL_TRAITS>::handleTimeout,
this,
boost::asio::placeholders::error));
if (connection()->isTimedOut()) {
throw ERROR_EXCEPTION << "timed out (" << m_timeoutSeconds << ')';
}
if (networkError()) {
throw ERROR_EXCEPTION << "read data from socket failed: " << networkErrorMsg();
}
}
else {
throw ERROR_EXCEPTION << "timed out (" << m_timeoutSeconds << ')';
}
}
/// \brief callback for processing data read by asyncReadSome
///
/// \exception throws ERROR_EXCEPTION on failure
void handleAsyncReadUntil(boost::system::error_code const& error, ///< holds result of the read
size_t bytesTransferred) ///< hold number of bytes read
{
try {
m_timer.cancel();
m_bytesTransferred = bytesTransferred;
if (!connection()->isTimedOut()) {
if (error) {
if (boost::asio::error::eof != error) {
try {
networkErrorMsg(boost::lexical_cast<std::string>(error));
networkErrorMsg(" ");
networkErrorMsg(error.message());
}
catch (...) {
networkErrorMsg("handleAsyncRead unknown error");
}
}
m_eof = true; // alwasy set to eof even if not eof to prevent more read attempts
}
}
}
catch (std::exception const & e) {
if (!connection()->isTimedOut()) {
networkErrorMsg(e.what());
}
}
catch (...) {
if (!connection()->isTimedOut()) {
networkErrorMsg("unknown error");
}
}
}
/// \brief converts the responseCode to a client ResultCode
///
/// \return
/// \li \c CLIENT_RESULT_OK: on success and no more data to receive
/// \li \c CLIENT_RESULT_MORE_DATA: on success and more data to reveive
/// \li \c CLIENT_RESULT_NOT_FOUND: on success but no (matching) file(s) found
///
/// \exception ERROR_EXCEPTION if the request was not successful
// Todo:sadewang - currently the response message is not sent in Response data. The default value is ""
// Check if that message is required and then add/remove it
ClientCode resultCode() {
switch (m_protocolHandler.responseCode()) {
case ResponseCode::RESPONSE_OK:
if (CLIENT_STATE_MORE_DATA != m_state) {
setResponseData(CLIENT_RESULT_OK, m_protocolHandler.parameters(), m_protocolHandler.headers());
return CLIENT_RESULT_OK;
}
else {
setResponseData(CLIENT_RESULT_MORE_DATA, m_protocolHandler.parameters(), m_protocolHandler.headers());
return CLIENT_RESULT_MORE_DATA;
}
case ResponseCode::RESPONSE_NOT_FOUND:
if (CLIENT_STATE_MORE_DATA == m_state) {
// throw away any remaining data
// it is not needed but needs to be read
readData();
}
setResponseData(CLIENT_RESULT_NOT_FOUND, m_protocolHandler.parameters(), m_protocolHandler.headers());
return CLIENT_RESULT_NOT_FOUND;
case ResponseCode::RESPONSE_REQUESTER_THROTTLED:
setResponseData(CLIENT_RESULT_NOSPACE, m_protocolHandler.parameters(), m_protocolHandler.headers());
readData();
throw THROTTLING_EXCEPTION << "Client throttled";
default:
{
setResponseData(CLIENT_RESULT_ERROR, m_protocolHandler.parameters(), m_protocolHandler.headers());
if (m_eof) {
throw ERROR_EXCEPTION << "socket returend EOF, but expecting more data to read";
}
std::string errStr;
readData(errStr);
errStr += std::string(" Responsecode: ") + boost::lexical_cast<std::string>(m_protocolHandler.responseCode());
throw ERROR_EXCEPTION << errStr;
}
}
}
/// \brief callback to handle timeouts
void handleTimeout(const boost::system::error_code& error) {
if (error != boost::asio::error::operation_aborted) {
connection()->cancel();
connection()->setTimedOut();
disconnect();
}
}
int connectTimeoutSeconds() {
return m_connectTimeoutSeconds;
}
bool networkError() {
return !m_networkError.empty();
}
std::string networkErrorMsg() {
return m_networkError;
}
void networkErrorMsg(std::string const& msg) {
m_networkError += msg;
}
void networkErrorMsg(char const* msg) {
m_networkError += msg;
}
bool usingSocketTimeouts()
{
return m_usingSocketTimeouts;
}
bool readCsRequestResponse(std::string& response)
{
boost::asio::streambuf responseStreamBuf;
readUntil(responseStreamBuf, "\r\n");
// Check that response is OK.
std::istream stream(&responseStreamBuf);
std::string httpVersion;
stream >> httpVersion;
unsigned int statusCode;
stream >> statusCode;
std::string statusMessage;
std::getline(stream, statusMessage);
if (!stream || httpVersion.substr(0, 5) != "HTTP/") {
response = "invalid response";
response += httpVersion;
response += " ";
response += boost::lexical_cast<std::string>(statusCode);
response += " ";
response += statusMessage;
return false;
}
bool ok = true;
if (statusCode != 200) {
response += "server returned: ";
response += boost::lexical_cast<std::string>(statusCode);
response += " - ";
ok = false;
}
readUntil(responseStreamBuf, "\r\n\r\n");
std::string header;
std::size_t contentLength = 0;
while (std::getline(stream, header) && header != "\r") {
if (boost::algorithm::starts_with(header, "Content-Length")) {
std::string::size_type startIdx = header.find_first_of("0123456789");
std::string::size_type endIdx = header.find_last_of("0123456789");
if (std::string::npos != startIdx) {
contentLength = boost::lexical_cast<std::size_t>(header.substr(startIdx, endIdx - startIdx + 1));
}
}
}
std::stringstream responseStringStream;
std::size_t remainingBytes = contentLength - responseStreamBuf.size();
if (responseStreamBuf.size()) {
responseStringStream << &responseStreamBuf;
}
std::size_t bytesRead;
boost::system::error_code ec;
if (remainingBytes > 0) {
std::vector<char> buf(remainingBytes);
// FIXME: handle read errors
bytesRead = connection()->read(&buf[0], remainingBytes);
responseStringStream.write(&buf[0], bytesRead);
}
else if (0 == contentLength) {
// no content length sent have to read until eof
std::vector<char> buf(4096);
while ((bytesRead = connection()->readSome(&buf[0], buf.size())) > 0) {
responseStringStream.write(&buf[0], bytesRead);
}
}
if (ec && boost::asio::error::eof != ec) {
response += "read failed: ";
response += boost::lexical_cast<std::string>(ec);
response += " - ";
ok = false;
}
response += responseStringStream.str();
return ok;
}
bool selfsignedMustMatch()
{
return m_selfsignedMustMatch;
}
private:
std::string m_ipAddress; ///< comma seperated ip addresses or fqdn of peer to connect to
std::string m_port; ///< port to connect to
std::string m_hostId; ///< host id of the client
std::string m_password; ///< connection passphrase
std::string m_cnonce; ///< random generated data
std::string m_snonce; ///< random generated data returned by the server
std::string m_sessionId; ///< session id returned by server
std::string m_connectedAddress; ///< connected address of peer
typename PROTOCOL_TRAITS::protocolHandler_t m_protocolHandler; ///< handles formatting requests and processing responses
boost::asio::io_service m_ioService; ///< required for boost.asio
bool m_keepAlive; ///< true: do not disconnect after a request finishes, false: dissconnect
ClientState m_state; ///< tracks the clients state
std::size_t m_dataSize; ///< size of the data being returned
std::size_t m_bytesTransferred; ///< bytes transferred from last read
std::size_t m_transferredBytesLeftToProcess; ///< bytes transferred that have not yet been processed
std::size_t m_dataBytesLeftToRead; ///< tracks amount of data left to read
int m_maxBufferSizeBytes; ///< max size to use for internal buffers
std::vector<char> m_buffer; ///< buffer for reading data from connection
int m_connectTimeoutSeconds; ///< inactive duration before considering a connect attempt timed out
int m_timeoutSeconds; ///< inactive duration before considering a connection timed out
boost::asio::deadline_timer m_timer; ///< used to check for timeouts
int m_sendWindowSizeBytes; ///< tcp send window size in bytes to use. 0: use system default
int m_receiveWindowSizeBytes; ///< tcp receive window size in bytes to use. 0: use system default
bool m_loggingOut;
std::time_t m_lastConnectionActivity; ////< last time any atcivity occured over the connection
std::time_t m_heartbeatIntervalSeconds; ///< holds duration of non-activity that requires a heartbeat to be sent
bool m_ioServiceRunning; ///< used to track if the io service is running true: do not need to call run, false: need to call run
writeNInfo m_writeNInfo; ///< holds async write n info
bool m_eof; ///< set if socket returned eof
std::string m_networkError; ///< holding any errors during async handling
bool m_usingSocketTimeouts; ///< indicates if socket timesout should be used (true: yes, false: no)
bool m_selfsignedMustMatch;
bool m_useFxLogin; ///< indicates if fx login should be used for fx jobs true: yse, false: no default no
bool m_useCertAuth;
};
/// \brief non ssl client for interacting with the cx process server
///
/// \sa protocolhandler.h for details about the parameterized type PROTOCOL_TRAITS
template <typename PROTOCOL_TRAITS>
class Client : public BasicClient<PROTOCOL_TRAITS>
{
public:
/// \brief constructor
explicit Client(std::string const& ipAddress, ///< server ip for connection
std::string const& port, ///< port for connection
std::string const& hostId, ///< host id of the client
int maxBufferSizeBytes, ///< max buffer size fore socket read/write
int connectTimeoutSeconds, ///< inactive duration before considering a connect attempt timed out
int timeoutSeconds, ///< inactive duration before considering a connection timed out
bool keepAlive, ///< should server keep connection alive true: yes, false: no
int sendWindowSizeBytes, ///< tcp send window size. 0 means uses system value
int receiveWindowSizeBytes, ///< tcp receive window size. 0 means uses system value
int writeMode, ///< write mode to use see writemode.h for possible modes
std::string const& password, ///< connection passphrase
std::time_t heartbeatIntervalSeconds = HEARTBEAT_INTERVAL_SECONDS, ///< interval in seconds when a heartbeat should be sent if no other activity
bool useFxLogin = false ///< indicates if fx login should be used. true: yes, false: no default no
)
: BasicClient<PROTOCOL_TRAITS>(ipAddress,
port,
hostId,
maxBufferSizeBytes,
connectTimeoutSeconds,
timeoutSeconds,
keepAlive,
sendWindowSizeBytes,
receiveWindowSizeBytes,
heartbeatIntervalSeconds,
writeMode,
password,
useFxLogin),
m_connection(new Connection(this->ioService())),
m_timer(this->ioService())
{}
/// \brief destructor
virtual ~Client()
{
this->logout();
}
protected:
/// \brief get a pointer to the connection object
///
/// \return
/// \li \c pointer to the connection object used by this client
/// \see ConnectionAbc::ptr details on the pointer type
virtual ConnectionAbc::ptr connection() {
return m_connection;
}
/// \brief connects to the given endpoint
///
/// before connecting sets the provided window sizes on the socket as that needs to be done
/// before connection
///
/// \note
/// \li \c 0 for a window size means use system default value
virtual void syncConnect(boost::asio::ip::tcp::endpoint const& endpoint, ///< peer to connect to
int sendWindowSizeBytes, ///< tcp send window size to use (overrides system setting)
int receiveWindowSizeBytes) ///< tcp receive window size to use (overrides system setting)
{
m_connection->connect(endpoint, sendWindowSizeBytes, receiveWindowSizeBytes);
}
/// \brief async connects to the given endpoint
///
/// \note
/// \li \c 0 for a window size means use system default value
virtual void asyncConnect(boost::asio::ip::tcp::endpoint const& endpoint, ///< peer to connect to
int sendWindowSizeBytes, ///< tcp send window size to use (overrides system setting)
int receiveWindowSizeBytes) ///< tcp receive window size to use (overrides system setting)
{
m_connection->asyncConnect(endpoint,
sendWindowSizeBytes,
receiveWindowSizeBytes,
boost::bind(&Client<PROTOCOL_TRAITS>::handleAsyncConnect,
this,
boost::asio::placeholders::error));
this->runAsyncRequest(m_timer,
boost::bind(&Client::handleTimeout,
this,
boost::asio::placeholders::error),
this->connectTimeoutSeconds());
if (m_connection->isTimedOut()) {
throw ERROR_EXCEPTION << "timed out (" << this->timeoutSeconds() << ')';
}
if (this->networkError()) {
throw ERROR_EXCEPTION << "connect socket failed: " << this->networkErrorMsg();
}
}
void handleAsyncConnect(boost::system::error_code const & error)
{
m_timer.cancel();
if (error) {
try {
this->networkErrorMsg(boost::lexical_cast<std::string>(error));
this->networkErrorMsg(" ");
this->networkErrorMsg(error.message());
}
catch (...) {
this->networkErrorMsg("handleAsyncConnect unknown error");
}
}
else {
m_connection->setPeerIpAddress();
}
}
/// \brief callback to handle timeouts
void handleTimeout(const boost::system::error_code& error) {
if (error != boost::asio::error::operation_aborted) {
m_connection->cancel();
m_connection->setTimedOut();
this->disconnect();
}
}
private:
ConnectionAbc::ptr m_connection; ///< connection object
boost::asio::deadline_timer m_timer; ///< used to check for connect timeouts
};
/// \brief ssl client for interacting with the cx process server
///
/// \sa protocolhandler.h for details about the parameterized type PROTOCOL_TRAITS
template <typename PROTOCOL_TRAITS>
class SslClient : public BasicClient<PROTOCOL_TRAITS> {
public:
/// \brief constructor
explicit SslClient(std::string const& ipAddress, ///< server ip for connection
std::string const& port, ///< port for connection
std::string const& hostId, ///< host id of the client
int maxBufferSizeBytes, ///< max buffer size fore socket read/write
int connectTimeoutSeconds, ///< inactive duration before considering a connect attempt timed out
int timeoutSeconds, ///< inactive duration before considering a connection timed out
bool keepAlive, ///< should server keep connection alive true: yes, false: no
int sendWindowSizeBytes, ///< tcp send window size. 0 means uses system value
int receiveWindowSizeBytes, ///< tcp receive window size. 0 means uses system value
std::string const& clientPemFile, ///< ssl client pem file to use for server verification
int writeMode, ///< write mode to be used see writemode.h for possible modes
std::string const& password = std::string(), ///< password for login
std::time_t heartbeatIntervalSeconds = HEARTBEAT_INTERVAL_SECONDS, ///< interval in seconds when a heartbeat should be sent if no other activity
bool useFxLogin = false ///< indicates if fx login should be used. true: yes, false: no default no
)
: BasicClient<PROTOCOL_TRAITS>(ipAddress,
port,
hostId,
maxBufferSizeBytes,
connectTimeoutSeconds,
timeoutSeconds,
keepAlive,
sendWindowSizeBytes,
receiveWindowSizeBytes,
heartbeatIntervalSeconds,
writeMode,
password,
useFxLogin),
m_timer(this->ioService())
{
m_connection.reset(new SslConnection(this->ioService(), clientPemFile));
}
/// \brief constructor
explicit SslClient(std::string const& ipAddress, ///< server ip for connection
std::string const& port, ///< port for connection
std::string const& hostId, ///< host id of the client
int maxBufferSizeBytes, ///< max buffer size fore socket read/write
int connectTimeoutSeconds, ///< inactive duration before considering a connect attempt timed out
int timeoutSeconds, ///< inactive duration before considering a connection timed out
bool keepAlive, ///< should server keep connection alive true: yes, false: no
int sendWindowSizeBytes, ///< tcp send window size. 0 means uses system value
int receiveWindowSizeBytes, ///< tcp receive window size. 0 means uses system value
std::string const& certFile, ///< ssl client pem file to use for client cert
std::string const& keyFile, ///< ssl client pem file to use for client private key
std::string const& serverCertThumbprint, ///< thumbprint to be used to verify server cert
int writeMode, ///< write mode to be used see writemode.h for possible modes
std::time_t heartbeatIntervalSeconds = HEARTBEAT_INTERVAL_SECONDS, ///< interval in seconds when a heartbeat should be sent if no other activity
bool useFxLogin = false ///< indicates if fx login should be used. true: yes, false: no default no
)
: BasicClient<PROTOCOL_TRAITS>(ipAddress,
port,
hostId,
maxBufferSizeBytes,
connectTimeoutSeconds,
timeoutSeconds,
keepAlive,
sendWindowSizeBytes,
receiveWindowSizeBytes,
heartbeatIntervalSeconds,
writeMode),
m_timer(this->ioService())
{
m_connection.reset(new SslConnection(this->ioService(), certFile, keyFile, serverCertThumbprint));
}
/// \brief destructor
virtual ~SslClient()
{
this->logout();
}
/// \brief connects to the given endpoint
///
/// before connecting sets the provided window sizes on the socket as that needs to be done
/// before connection
///
/// \note
/// \li \c 0 for a window size means use system default value
virtual void syncConnect(boost::asio::ip::tcp::endpoint const& endpoint, ///< peer to connect to
int sendWindowSizeBytes, ///< tcp send window size to use (overrides system setting)
int receiveWindowSizeBytes) ///< tcp receive window size to use (overrides system setting)
{
m_connection->connect(endpoint, sendWindowSizeBytes, receiveWindowSizeBytes);
dynamic_cast<SslConnection*>(m_connection.get())->sslHandshake();
m_connection->sslHandshakeCompleted();
}
/// \brief async connects to the given endpoint
///
/// \note
/// \li \c 0 for a window size means use system default value
virtual void asyncConnect(boost::asio::ip::tcp::endpoint const& endpoint, ///< peer to connect to
int sendWindowSizeBytes, ///< tcp send window size to use (overrides system setting)
int receiveWindowSizeBytes) ///< tcp receive window size to use (overrides system setting)
{
m_connection->asyncConnect(endpoint,
sendWindowSizeBytes,
receiveWindowSizeBytes,
boost::bind(&SslClient::handleAsyncConnect,
this,
boost::asio::placeholders::error));
this->runAsyncRequest(m_timer,
boost::bind(&SslClient::handleTimeout,
this,
boost::asio::placeholders::error),
this->connectTimeoutSeconds());
if (m_connection->isTimedOut()) {
throw ERROR_EXCEPTION << "timed out (" << this->timeoutSeconds() << ')';
}
if (this->networkError()) {
throw ERROR_EXCEPTION << "connect socket failed: " << this->networkErrorMsg();
}
asyncHandshake();
}
void handleAsyncConnect(boost::system::error_code const & error)
{
m_timer.cancel();
if (error) {
try {
this->networkErrorMsg(boost::lexical_cast<std::string>(error));
this->networkErrorMsg(" ");
this->networkErrorMsg(error.message());
}
catch (...) {
this->networkErrorMsg("handleAsyncConnect unknown error");
}
}
else {
m_connection->setPeerIpAddress();
}
}
void asyncHandshake()
{
dynamic_cast<SslConnection*>(m_connection.get())->asyncSslHandshake(boost::bind(&SslClient::handleAsyncSslHandshake,
this,
boost::asio::placeholders::error));
this->runAsyncRequest(m_timer,
boost::bind(&SslClient::handleTimeout,
this,
boost::asio::placeholders::error));
if (m_connection->isTimedOut()) {
throw ERROR_EXCEPTION << "timed out (" << this->timeoutSeconds() << ')';
}
if (this->networkError()) {
throw ERROR_EXCEPTION << "ssl handshake failed: " << this->networkErrorMsg();
}
}
/// \brief disconnects from the cx process server
virtual void disconnect()
{
try {
if (BasicClient<PROTOCOL_TRAITS>::CLIENT_STATE_NEEDS_TO_CONNECT != this->state()
&& !this->networkError()
&& !m_connection->isTimedOut()) {
if (this->usingSocketTimeouts()) {
sslShutdown();
m_connection->sslShutdownCompleted();
}
else {
asyncSslShutdown();
}
}
}
catch (std::exception const& e) {
this->networkErrorMsg(e.what());
}
BasicClient<PROTOCOL_TRAITS>::disconnect();
}
/// \brief perform async ssl shutdown
void asyncSslShutdown()
{
if (dynamic_cast<SslConnection*>(m_connection.get())->asyncSslShutdown(boost::bind(&SslClient::handleAsyncSslShutdown,
this,
boost::asio::placeholders::error))) {
this->runAsyncRequest(m_timer,
boost::bind(&SslClient::handleTimeout,
this,
boost::asio::placeholders::error),
60); // TODO: make customizable
}
}
/// brief perform sync ssl shutdown
void sslShutdown()
{
dynamic_cast<SslConnection*>(m_connection.get())->sslShutdown();
}
/// \brief handler for async ssl connect
void handleAsyncSslHandshake(boost::system::error_code const & error) ///< holds result of connect
{
try {
m_timer.cancel();
if (error) {
this->networkErrorMsg(boost::lexical_cast<std::string>(error));
this->networkErrorMsg(" ");
this->networkErrorMsg(error.message());
}
else {
m_connection->sslHandshakeCompleted();
}
}
catch (std::exception const& e) {
this->networkErrorMsg("handleAsyncSslHandshake completing failed: ");
this->networkErrorMsg(e.what());
}
}
/// \brief handler for async ssl disconnect
void handleAsyncSslShutdown(boost::system::error_code const & error) ///< holds result of disconnect
{
m_timer.cancel();
m_connection->sslShutdownCompleted();
if (error) {
try {
this->networkErrorMsg(boost::lexical_cast<std::string>(error));
}
catch (...) {
this->networkErrorMsg("handleAsyncSslShutdown unknown error");
}
}
BasicClient<PROTOCOL_TRAITS>::disconnect();
}
/// \brief callback to handle timeouts
void handleTimeout(const boost::system::error_code& error) {
if (error != boost::asio::error::operation_aborted) {
connection()->cancel();
m_connection->setTimedOut();
m_connection->sslShutdownCompleted();
this->disconnect();
}
}
protected:
/// \brief get a pointer to the connection object
///
/// \return
/// \li \c pointer to the connection object used by this client
/// \see ConnectionAbc::ptr details on the pointer type
virtual ConnectionAbc::ptr connection() {
return m_connection;
}
private:
ConnectionAbc::ptr m_connection; ///< connection object
boost::asio::deadline_timer m_timer; ///< used to check for ssl connect, handshake and shutdown timeouts
};
/// \brief client used to interact with cfs
template <typename PROTOCOL_TRAITS>
class CfsClient : public Client<PROTOCOL_TRAITS> {
public:
/// \brief constructor
explicit CfsClient(std::string const& cfsLocalName, ///< cfs local name used to connect to cfs
std::string const& psId, ///< ps id for connection
std::string const& hostId, ///< host id of the client
std::string const& password, ///< password for login
int maxBufferSizeBytes, ///< max buffer size fore socket read/write
int connectTimeoutSeconds, ///< inactive duration before considering a connect attempt timed out
int timeoutSeconds, ///< inactive duration before considering a connection timed out
bool keepAlive, ///< should server keep connection alive true: yes, false: no
int sendWindowSizeBytes, ///< tcp send window size. 0 means uses system value
int receiveWindowSizeBytes, ///< tcp receive window size. 0 means uses system value
int writeMode, ///< write mode to use see writemode.h for possible modes
std::time_t heartbeatIntervalSeconds = HEARTBEAT_INTERVAL_SECONDS ///< interval in seconds when a heartbeat should be sent if no other activity
)
: Client<PROTOCOL_TRAITS>(std::string(), // not used for cfs client
std::string(), // not used for cfs client
hostId,
maxBufferSizeBytes,
connectTimeoutSeconds,
timeoutSeconds,
keepAlive,
sendWindowSizeBytes,
receiveWindowSizeBytes,
writeMode,
password,
heartbeatIntervalSeconds),
m_cfsLocalName(cfsLocalName),
m_psId(psId)
{
}
/// \brief sets up the socket connection through cfs
///
/// issues the cfs fwd connect request to the cfs and waits for it to pass back
/// the native socket that will be used for subsequent requests to cxps
virtual void connectSocket()
{
CfsIpcClient cfsIpcClient;
boost::asio::ip::tcp::socket::native_handle_type nativeSocket = cfsIpcClient.cfsFwdConnect(m_psId, m_cfsLocalName, false, this->timeoutSeconds());
this->connection()->lowestLayerSocket().assign(boost::asio::ip::tcp::v4(), nativeSocket);
this->setSocketTimeouts();
this->connection()->setPeerIpAddress();
}
protected:
private:
std::string m_cfsLocalName; ///< cfs local name needed to connect to cfs
std::string m_psId; ///< ps id used for finding the ps to connect
};
/// \brief client used to interact with cfs
template <typename PROTOCOL_TRAITS>
class CfsSslClient : public SslClient<PROTOCOL_TRAITS> {
public:
/// \brief constructor
CfsSslClient(std::string const& cfsLocalName, ///< cfs local name used to connect to cfs
std::string const& psId, ///< ps id for connection
std::string const& hostId, ///< host id of the client
std::string const& password, ///< password for login
int maxBufferSizeBytes, ///< max buffer size fore socket read/write
int connectTimeoutSeconds, ///< inactive duration before considering a connect attempt timed out
int timeoutSeconds, ///< inactive duration before considering a connection timed out
bool keepAlive, ///< should server keep connection alive true: yes, false: no
int sendWindowSizeBytes, ///< tcp send window size. 0 means uses system value
int receiveWindowSizeBytes, ///< tcp receive window size. 0 means uses system value
std::string const& clientPemFile, ///< ssl client pem file to use for server verification
int writeMode, ///< write mode to be used see writemode.h for possible modes
std::time_t heartbeatIntervalSeconds = HEARTBEAT_INTERVAL_SECONDS ///< interval in seconds when a heartbeat should be sent if no other activity
)
: SslClient<PROTOCOL_TRAITS>(std::string(), // not used for cfs client
std::string(), // not used for cfs client
hostId,
maxBufferSizeBytes,
connectTimeoutSeconds,
timeoutSeconds,
keepAlive,
sendWindowSizeBytes,
receiveWindowSizeBytes,
clientPemFile,
writeMode,
password,
heartbeatIntervalSeconds),
m_cfsLocalName(cfsLocalName),
m_psId(psId)
{
}
/// \brief sets up the socket connection through cfs
///
/// issues the cfs fwd connect request to the cfs and waits for it to pass back
/// the native socket that will be used for subsequent requests to cxps
virtual void connectSocket()
{
CfsIpcClient cfsIpcClient;
boost::asio::ip::tcp::socket::native_handle_type nativeSocket = cfsIpcClient.cfsFwdConnect(m_psId, m_cfsLocalName, true, this->timeoutSeconds());
this->connection()->lowestLayerSocket().assign(boost::asio::ip::tcp::v4(), nativeSocket);
this->connection()->setPeerIpAddress();
this->setSocketTimeouts();
if (this->usingSocketTimeouts()) {
dynamic_cast<SslConnection*>(this->connection().get())->sslHandshake();
this->connection()->sslHandshakeCompleted();
}
else {
this->asyncHandshake();
}
}
protected:
private:
std::string m_cfsLocalName; ///< cfs local name needed to connect to cfs
std::string m_psId; ///< ps id used for finding the ps to connect
};
/// \brief client used for local file
class FileClient : public ClientAbc {
public:
typedef std::pair<std::string, std::string> remapPrefixFromTo_t; ///< Remap prefix type for file operations
explicit FileClient(int writeMode,
const remapPrefixFromTo_t &remapPrefixFromTo = remapPrefixFromTo_t()) ///< Remap prefix path for file operations
: ClientAbc(writeMode),
m_compress(false),
m_getFileBytesLeft(0),
m_remapPrefixFromTo(remapPrefixFromTo)
{}
virtual ~FileClient() {}
/// \brief abort an in progess request
virtual void abortRequest(bool disconnectOnly = false)
{
reset();
}
/// \brief issue put file request to write data to a file
///
/// use to write data directly to a file. If the total size of the data to be sent
/// is not pre-calculated then multiple put file requests can be used to send the data.
///
/// \exception throws ERROR_EXCEPTION on failure
///
/// \note
/// \li \c if more then 1 put file request is needed to send all the data, then no other
/// requests can be sent until a put file request with moreData set to false is issued
/// \li \c if all the data has been sent before knowing there is no more data (i.e. a
/// putfile request with moreData set to false has not been sent), then 1 additional put
/// file request with moreData set to false still needs to be sent to let the server know
/// all the data has been sent. In this case set dataSize to 0 and data to NULL (0)
virtual void putFile(std::string const& remoteName, ///< name of remote file to put the data into
std::size_t dataSize, ///< size of the data being sent in this request
char const * data, ///< the data to put in the file
bool moreData, ///< if there is more data to be sent true: yes, false: no
COMPRESS_MODE compressMode, ///< compress mode (see volumegroupsettings.h)
bool createDirs = false, ///< indicates if missing dirs should be created (true: yes, false: no)
long long offset = PROTOCOL_DO_NOT_SEND_OFFSET ///< offset to write at
)
{
putFile(remoteName, dataSize, data, moreData, compressMode, s_emptyHeader, createDirs, offset);
}
virtual void putFile(std::string const& remoteName, ///< name of remote file to put the data into
std::size_t dataSize, ///< size of the data being sent in this request
char const * data, ///< the data to put in the file
bool moreData, ///< if there is more data to be sent true: yes, false: no
COMPRESS_MODE compressMode, ///< compress mode (see volumegroupsettings.h)
mapHeaders_t const & headers, ///< additional headers to send in putfile request
bool createDirs = false, ///< indicates if missing dirs should be created (true: yes, false: no)
long long offset = PROTOCOL_DO_NOT_SEND_OFFSET ///< offset to write at
)
{
boost::filesystem::path fullPathName;
getFullPathName(remoteName, fullPathName);
// TODO: is preventing other requests before putFile has received moreData = false or aborted required
// TODO: is this good enough
if (!m_name.empty() && m_name != fullPathName.string()) {
throw ERROR_EXCEPTION << "you must complete putFile for " << m_name << " or abort before getting a new file (" << fullPathName.string() << ')';
}
if (!m_fio.is_open()) {
m_name = fullPathName.string();
extendedLengthPath_t extName(ExtendedLengthPath::name(fullPathName.string()));
if (boost::filesystem::exists(extName)) {
// TODO: for now it will be overwritten
}
createPathsAsNeeded(fullPathName.string());
if (!m_fio.open(extName.string().c_str(), FIO::FIO_OVERWRITE)) {
throw ERROR_EXCEPTION << "open file " << fullPathName.string() << " failed: " << m_fio.errorAsString();
}
}
if (PROTOCOL_DO_NOT_SEND_OFFSET != offset) {
m_fio.seek(offset, SEEK_SET);
}
if (m_fio.write(data, (long)dataSize) < (long)dataSize) {
throw ERROR_EXCEPTION << "write to file " << fullPathName.string() << " failed: " << m_fio.errorAsString();
}
if (!moreData) {
if (WRITE_MODE_FLUSH == writeMode() && !m_fio.flushToDisk()) {
throw ERROR_EXCEPTION << "flush data to disk for file " << fullPathName.string() << " failed: " << m_fio.errorAsString();
}
reset();
}
}
/// \brief issue put file request to send a local file to a remote file
///
/// use to send a local file to a remote file.
///
/// \exception throws ERROR_EXCEPTION on failure or localName not found
///
/// \note
/// \li \c always uses binary mode
///
virtual void putFile(std::string const& remoteName, ///< name of remote file to put the data into
std::string const& localName, ///< name of local file to send
COMPRESS_MODE compressMode, ///< compress mode (see volumegroupsettings.h)
bool createDirs = false ///< indicates if missing dirs should be created (true: yes, false: no)
)
{
putFile(remoteName, localName, compressMode, s_emptyHeader, createDirs);
}
virtual void putFile(std::string const& remoteName, ///< name of remote file to put the data into
std::string const& localName, ///< name of local file to send
COMPRESS_MODE compressMode, ///< compress mode (see volumegroupsettings.h)
mapHeaders_t const & headers, ///< additional headers to be sent in putfile request
bool createDirs = false ///< indicates if missing dirs should be created (true: yes, false: no)
)
{
boost::filesystem::path fullPathName;
getFullPathName(remoteName, fullPathName);
if (CLIENT_RESULT_NOT_FOUND == copyFile(localName, fullPathName.string(), compressMode)) {
throw ERROR_EXCEPTION << "local file " << localName << " not found";
}
}
/// \brief issue rename file request
///
/// \note
/// \li \c finalPaths should be used to tell transport it should perform the "final" rename instead of time shot manager
/// it should be a semi-conlon (';') separated list of the final paths that the file should get a "copy" of the new file.
/// hard links are used to create the "copies" unless transport server was built with RENAME_COPY_ON_FAILED_LINK defined.
/// in that case transport will attempt to copy the file if the hard link fails.
///
/// \exception ERROR_EXCEPTION on failure
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: oldName not found
virtual ClientCode renameFile(std::string const& fromName, ///< the name of the file that is to be renamed
std::string const& toName, ///< the new name to use
COMPRESS_MODE compressMode, ///< compress mode in affect (see compressmode.h)
mapHeaders_t const& headers, ///< additional headers to be sent in renamefile request
std::string const& finalPaths = std::string() ///< semi-colon (';') separated list of all paths that should get a "copy" of the renamed file
)
{
boost::filesystem::path oldFullPathName, newFullPathName;
try {
getFullPathName(fromName, oldFullPathName);
getFullPathName(toName, newFullPathName);
std::string oldName(oldFullPathName.string());
std::string newName(newFullPathName.string());
extendedLengthPath_t extOldName(ExtendedLengthPath::name(oldName));
if (!boost::filesystem::exists(extOldName)) {
return CLIENT_RESULT_NOT_FOUND;
}
extendedLengthPath_t extNewName(ExtendedLengthPath::name(newName));
if (finalPaths.empty()) {
// boost rename is too restrictive, so for case were rename
// should be allowed, need to delete new name if it exists
if (boost::filesystem::exists(extNewName)
&& boost::filesystem::is_regular_file(extNewName)) {
boost::filesystem::remove(extNewName);
}
boost::filesystem::rename(extOldName, extNewName);
}
else {
RenameFinal::rename(extOldName, extNewName, true, finalPaths, true,
MAKE_GET_FULL_PATH_CALLBACK_MEM_FUN(&FileClient::getFullPathName, this));
}
}
catch (std::exception const & e) {
throw ERROR_EXCEPTION << oldFullPathName.string() << " to " << newFullPathName.string() << " failed: " << e.what();
}
return CLIENT_RESULT_OK;
}
/// \brief issue rename file request
///
/// \note
/// \li \c finalPaths should be used to tell transport it should perform the "final" rename instead of time shot manager
/// it should be a semi-conlon (';') separated list of the final paths that the file should get a "copy" of the new file.
/// hard links are used to create the "copies" unless transport server was built with RENAME_COPY_ON_FAILED_LINK defined.
/// in that case transport will attempt to copy the file if the hard link fails.
///
/// \exception ERROR_EXCEPTION on failure
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: oldName not found
virtual ClientCode renameFile(std::string const& fromName, ///< the name of the file that is to be renamed
std::string const& toName, ///< the new name to use
COMPRESS_MODE compressMode, ///< compress mode in affect (see compressmode.h)
std::string const& finalPaths = std::string() ///< semi-colon (';') separated list of all paths that should get a "copy" of the renamed file
)
{
return renameFile(fromName, toName, compressMode, s_emptyHeader, finalPaths);
}
/// \brief issue delete file request
///
/// see documenation for FindDelete for complete details
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: remote file not found
/// \exception ERROR_EXCEPTION on failure
virtual ClientCode deleteFile(std::string const& names, ///< semi-colon seprated list of files and/or dirs to delete
int mode = FindDelete::FILES_ONLY ///< delete mode to use (FILES_ONLY, RECURSE_DIRS, or can combine both using logical-or ('|'))
)
{
return deleteFile(names, std::string(), mode);
}
/// \brief issue delete file request
///
/// see documenation for FindDelete for complete details
///
/// \return
/// \li \c CLIENT_RESULT_OK : on success
/// \li \c CLIENT_RESULT_NOT_FOUND: remote file not found
/// \exception ERROR_EXCEPTION on failure
virtual ClientCode deleteFile(std::string const& names, ///< semi-colon seprated list of files and/or dirs to delete
std::string const& fileSpec, ///< file spce to use to match file/dir names
int mode = FindDelete::FILES_ONLY ///< delete mode to use (FILES_ONLY, RECURSE_DIRS, or can combine both using logical-or ('|'))
)
{
std::string result;
try {
result = FindDelete::remove(names, fileSpec, mode,
MAKE_GET_FULL_PATH_CALLBACK_MEM_FUN(&FileClient::getFullPathName, this));
}
catch (std::exception const & e) {
throw ERROR_EXCEPTION << names << " - " << fileSpec << " - " << mode << " failed: " << e.what();
}
catch (...) {
throw ERROR_EXCEPTION << names << " - " << fileSpec << " - " << mode << " failed: unknown exception.";
}
if (!result.empty()) {
throw ERROR_EXCEPTION << names << " - " << fileSpec << " - " << mode << " failed: " << result;
}
return CLIENT_RESULT_OK;
}
/// \brief issue list file request
virtual ClientCode listFile(std::string const& fileSpec,
std::string & files)
{
try {
boost::filesystem::path fileSpecPath;
getFullPathName(fileSpec, fileSpecPath);
ListFile::listFileGlob(fileSpecPath, files);
}
catch (std::exception const & e) {
throw ERROR_EXCEPTION << "listFileGlob failed: " << e.what();
}
return (files.empty() ? CLIENT_RESULT_NOT_FOUND : CLIENT_RESULT_OK);
}
/// \brief issue get file request to get a remote file to a buffer
virtual ClientCode getFile(std::string const& name,
std::size_t dataSize,
char * data,
std::size_t& bytesReturned)
{
boost::filesystem::path fullPathName;
getFullPathName(name, fullPathName);
// TODO: is preventing other requests before getFile has reached eof or aborted required
// TODO: is this good enough
if (!m_name.empty() && m_name != fullPathName.string()) {
throw ERROR_EXCEPTION << "you must complete getFile for " << m_name << " or abort before getting a new file (" << fullPathName.string() << ')';
}
if (!m_fio.is_open()) {
m_name = fullPathName.string();
extendedLengthPath_t extName(ExtendedLengthPath::name(fullPathName.string()));
if (!boost::filesystem::exists(extName)) {
throw ERROR_EXCEPTION << fullPathName.string() << " not found";
}
if (!m_fio.open(extName.string().c_str(), FIO::FIO_READ_EXISTING | FIO::FIO_NOATIME)) {
throw ERROR_EXCEPTION << "open file " << fullPathName.string() << " failed: " << m_fio.errorAsString();
}
// TODO: this does not work if using volumes instead of files
m_getFileBytesLeft = m_fio.seek(0, SEEK_END);
m_fio.seek(0, SEEK_SET);
}
bytesReturned = m_fio.read(data, dataSize);
if (m_fio.bad()) {
throw ERROR_EXCEPTION << "read file " << fullPathName.string() << " failed: " << m_fio.errorAsString();
}
m_getFileBytesLeft -= bytesReturned;
if (m_fio.eof() || 0 == m_getFileBytesLeft) {
reset();
return CLIENT_RESULT_OK;
}
return CLIENT_RESULT_MORE_DATA;
}
virtual ClientCode getFile(std::string const& name,
size_t offset,
std::size_t dataSize,
char * data,
std::size_t& bytesReturned)
{
return getFile(name, dataSize, data, bytesReturned);
}
/// \brief issue get file request to get a remote file to a local file
virtual ClientCode getFile(std::string const& remoteName,
std::string const& localName)
{
boost::filesystem::path fullPathName;
getFullPathName(remoteName, fullPathName);
return copyFile(fullPathName.string(), localName, COMPRESS_NONE);
}
virtual ClientCode getFile(std::string const& remoteName,
std::string const& localName,
std::string& checksum)
{
throw ERROR_EXCEPTION << "not implemented for FileClient";
}
/// \brief issue heartbeat
///
/// alwasy returns CLIENT_RESULT_OK for FileClient
virtual ClientCode heartbeat(bool forceSend = false)
{
return CLIENT_RESULT_OK;
}
virtual std::string hostId()
{
return std::string();
}
virtual std::string ipAddress()
{
return std::string();
}
virtual std::string port()
{
return std::string();
}
virtual int timeoutSeconds()
{
return 0;
}
virtual bool sendCsGetRequest(std::string const& request, std::string& response)
{
return false;
}
virtual std::string password()
{
return std::string();
}
virtual void csConnect(std::string& fingerprint, std::string& certificate)
{
throw ERROR_EXCEPTION << "not implemented for FileClient";
}
virtual bool sendCsRequest(std::string const& request, std::string& response)
{
throw ERROR_EXCEPTION << "not implemented for FileClient";
}
protected:
/// \brief copies sourceName file to targetName file
///
/// copyFile will attempt to create a hard link between the source and target
/// if that fails, it will do an actual read source and write to target
///
/// \return
/// \li CLIENT_RESULT_OK if no errors
/// \li CLIENT_RESULT_NOT_FOUND if sourceName not found
/// \li throws exception on other errors
/// \exception ERROR_EXCEPTION on failure
ClientCode copyFile(std::string const& sourceName, ///< name of file to be copied
std::string const& targetName, ///< name of file to recevie the copy
COMPRESS_MODE compressMode ///< compress mode requested
)
{
// TODO honor compressMode
extendedLengthPath_t inExtName(ExtendedLengthPath::name(sourceName));
extendedLengthPath_t outExtName(ExtendedLengthPath::name(targetName));
if (!boost::filesystem::exists(inExtName)) {
return CLIENT_RESULT_NOT_FOUND;
}
if (boost::filesystem::exists(outExtName)) {
boost::filesystem::remove(outExtName);
}
createPathsAsNeeded(targetName);
try {
boost::filesystem::create_hard_link(inExtName, outExtName);
return CLIENT_RESULT_OK;
}
catch (...) {
// nothing to do here
}
// need to copy the file ourselves
// TODO: on windows may want to use CopyFile
FIO::Fio inFio;
FIO::Fio outFio;
try {
long bytes;
std::vector<char> buffer(1024 * 1024);
outFio.open(outExtName.string().c_str(), FIO::FIO_OVERWRITE);
inFio.open(inExtName.string().c_str(), FIO::FIO_READ_EXISTING);
while (inFio.good() && outFio.good()) {
bytes = inFio.read(&buffer[0], buffer.size());
outFio.write(&buffer[0], bytes);
}
if (WRITE_MODE_FLUSH == writeMode() && !outFio.flushToDisk()) {
throw ERROR_EXCEPTION << "flush data to disk for file " << targetName << " failed: " << outFio.errorAsString();
}
}
catch (std::exception const& e) {
throw ERROR_EXCEPTION << "copy file " << sourceName << " to " << targetName << " failed: " << e.what();
}
if (inFio.bad() || outFio.bad()) {
throw ERROR_EXCEPTION << "copy file " << sourceName << " to " << targetName << " failed: "
<< (inFio.bad() ? inFio.errorAsString() : "") << " - "
<< (outFio.bad() ? outFio.errorAsString() : "");
}
return CLIENT_RESULT_OK;
}
/// \brief creates and missing paths to the extName
void createPathsAsNeeded(boost::filesystem::path const& name)
{
// TODO: combine with requesthandler version
try {
boost::filesystem::path parentDirs(name.parent_path());
boost::filesystem::path::iterator iter(parentDirs.begin());
boost::filesystem::path::iterator iterEnd(parentDirs.end());
boost::filesystem::path createDir;
// skip over root name as on windows it is typically the drive
// letter colon name, which can not be created as a directory
// if not on windows there is no root name
if (name.has_root_name()) {
createDir /= *iter;
++iter;
}
// skip over root directory as it is the top most
// directory '/' on all systems. if it does not exist
// then there is a bigger problem and it will fail
// when trying to create it (most likely trying to use
// a disk that does not have a file system on it)
if (name.has_root_directory()) {
createDir /= *iter;
++iter;
}
for (/* empty */; iter != iterEnd; ++iter) {
createDir /= *iter;
extendedLengthPath_t extName(ExtendedLengthPath::name(createDir.string()));
if (!boost::filesystem::exists(extName)) {
boost::filesystem::create_directory(extName);
}
}
}
catch (std::exception const& e) {
throw ERROR_EXCEPTION << "creating parent directories failed: " << e.what();
}
}
/// \brief resets internal state
void reset()
{
m_fio.close();
m_name.clear();
m_compress = false;
m_getFileBytesLeft = 0;
}
/// \brief connect is a no-op for file client
virtual void syncConnect(boost::asio::ip::tcp::endpoint const&, ///< peer to connect to
int, ///< tcp send window size to use (overrides system setting)
int) ///< tcp receive window size to use (overrides system setting)
{
// no-op
}
/// \brief async connect is a no-op for file client
virtual void asyncConnect(boost::asio::ip::tcp::endpoint const&, ///< peer to connect to
int, ///< tcp send window size to use (overrides system setting)
int ///< tcp receive window size to use (overrides system setting)
)
{
// no-op for file client
}
/// \brief converts file names into their full paths
void getFullPathName(std::string const& name, ///< file name to be converted
boost::filesystem::path& fullPath ///< recevies the full path
)
{
fullPath = name;
if (!(m_remapPrefixFromTo.first.empty() || m_remapPrefixFromTo.second.empty()) && STARTS_WITH(name, m_remapPrefixFromTo.first)) {
fullPath = (m_remapPrefixFromTo.second);
fullPath /= name.substr(m_remapPrefixFromTo.first.size());
}
}
private:
FIO::Fio m_fio; ///< put/get file when using in-memory buffer and file
std::string m_name; ///< name of m_fio when it is opened
bool m_compress; ///< indicates if putFile should be comprssed before writting to disk
FIO::offset_t m_getFileBytesLeft; ///< number of bytes of get file left to send
remapPrefixFromTo_t m_remapPrefixFromTo; ///< Remap prefix path for file operations
};
/// \brief used for creating non-ssl HTTP client
typedef Client<HttpTraits> HttpClient_t;
/// \brief used for creating ssl HTTP client (https)
typedef SslClient<HttpTraits> HttpsClient_t;
/// \brief used for creating non-ssl HTTP cfs client
typedef CfsClient<HttpTraits> HttpCfsClient_t;
/// \brief used for creating ssl HTTP client (https)
typedef CfsSslClient<HttpTraits> HttpsCfsClient_t;
/// \brief used for creating file client
typedef FileClient FileClient_t;
#endif // CLIENT_H