wdtCmdLine.cpp (316 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include <wdt/Receiver.h> #include <wdt/Wdt.h> #include <wdt/WdtResourceController.h> #include <folly/String.h> #include <gflags/gflags.h> #include <glog/logging.h> #include <signal.h> #include <chrono> #include <fstream> #include <future> #include <iostream> #include <thread> // Used in fbonly to add socket creator setup #ifndef ADDITIONAL_SENDER_SETUP #define ADDITIONAL_SENDER_SETUP #endif // This can be the fbonly (FbWdt) version (extended initialization, and options) #ifndef WDTCLASS #define WDTCLASS Wdt #endif // Flags not already in WdtOptions.h/WdtFlags.cpp.inc DEFINE_bool(fork, false, "If true, forks the receiver, if false, no forking/stay in fg"); DEFINE_bool(run_as_daemon, false, "If true, run the receiver as never ending process"); DEFINE_string(directory, ".", "Source/Destination directory"); DEFINE_string(manifest, "", "If specified, then we will read a list of files and optional " "sizes from this file, use - for stdin"); DEFINE_string( destination, "", "empty is server (destination) mode, non empty is destination host"); DEFINE_string(hostname, "", "override hostname in transfe request"); DEFINE_bool(parse_transfer_log, false, "If true, transfer log is parsed and fixed"); DEFINE_string(transfer_id, "", "Transfer id. Receiver will generate one to be used (via URL) on" " the sender if not set explicitly"); DEFINE_int32( protocol_version, 0, "Protocol version to use, this is used to simulate protocol negotiation"); DEFINE_string(connection_url, "", "Provide the connection string to connect to receiver" " (incl. transfer_id and other parameters)." " Deprecated: use - arg instead for safe encryption key" " transmission"); DECLARE_bool(logtostderr); // default of standard glog is off - let's set it on DEFINE_int32(abort_after_seconds, 0, "Abort transfer after given seconds. 0 means don't abort."); DEFINE_string(recovery_id, "", "Recovery-id to use for download resumption"); DEFINE_bool(treat_fewer_port_as_error, false, "If the receiver is unable to bind to all the ports, treat that as " "an error."); DEFINE_bool(print_options, false, "If true, wdt prints the option values and exits. Option values " "printed take into account option type and other command line " "flags specified."); DEFINE_bool(exit_on_bad_flags, true, "If true, wdt exits on bad/unknown flag. Otherwise, an unknown " "flags are ignored"); DEFINE_string(test_only_encryption_secret, "", "Test only encryption secret, to test url encoding/decoding"); DEFINE_string(app_name, "wdt", "Identifier used for reporting (scuba, at fb)"); DEFINE_string(namespace, "", "WDT namespace (e.g shard)"); DEFINE_string(dest_id, "", "Unique destination identifier (will default to hostname)"); DECLARE_bool(help); using namespace facebook::wdt; // TODO: move this to some util and/or delete template <typename T> std::ostream &operator<<(std::ostream &os, const std::set<T> &v) { std::copy(v.begin(), v.end(), std::ostream_iterator<T>(os, " ")); return os; } std::mutex abortMutex; std::condition_variable abortCondVar; bool isAbortCancelled = false; std::shared_ptr<WdtAbortChecker> setupAbortChecker() { int abortSeconds = FLAGS_abort_after_seconds; if (abortSeconds <= 0) { return nullptr; } WLOG(INFO) << "Setting up abort " << abortSeconds << " seconds."; static std::atomic<bool> abortTrigger{false}; auto res = std::make_shared<WdtAbortChecker>(abortTrigger); auto lambda = [=] { WLOG(INFO) << "Will abort in " << abortSeconds << " seconds."; std::unique_lock<std::mutex> lk(abortMutex); bool isNotAbort = abortCondVar.wait_for(lk, std::chrono::seconds(abortSeconds), [&]() -> bool { return isAbortCancelled; }); if (isNotAbort) { WLOG(INFO) << "Already finished normally, no abort."; } else { WLOG(INFO) << "Requesting abort."; abortTrigger.store(true); } }; // Run this in a separate thread concurrently with sender/receiver static auto f = std::async(std::launch::async, lambda); return res; } void setAbortChecker(WdtBase &senderOrReceiver) { senderOrReceiver.setAbortChecker(setupAbortChecker()); } void cancelAbort() { { std::unique_lock<std::mutex> lk(abortMutex); isAbortCancelled = true; abortCondVar.notify_one(); } std::this_thread::yield(); } void readManifest(std::istream &fin, WdtTransferRequest &req, bool dfltDirect) { std::string line; while (std::getline(fin, line)) { std::vector<std::string> fields; folly::split('\t', line, fields, true); if (fields.empty() || fields.size() > 3) { WLOG(FATAL) << "Invalid input manifest: " << line; } int64_t filesize = fields.size() > 1 ? folly::to<int64_t>(fields[1]) : -1; bool odirect = fields.size() > 2 ? folly::to<bool>(fields[2]) : dfltDirect; req.fileInfo.emplace_back(fields[0], filesize, odirect); } req.disableDirectoryTraversal = true; } namespace GFLAGS_NAMESPACE { extern GFLAGS_DLL_DECL void (*gflags_exitfunc)(int); } bool badGflagFound = false; static std::string usage; void printUsage() { std::cerr << usage << std::endl; } void sigUSR1Handler(int) { ReportPerfSignalSubscriber::notify(); } int main(int argc, char *argv[]) { #ifdef WDTFBINIT WDTFBINIT #endif FLAGS_logtostderr = true; // Ugliness in gflags' api; to be able to use program name GFLAGS_NAMESPACE::SetArgv(argc, const_cast<const char **>(argv)); GFLAGS_NAMESPACE::SetVersionString(Protocol::getFullVersion()); usage.assign("WDT Warp-speed Data Transfer. v "); usage.append(GFLAGS_NAMESPACE::VersionString()); usage.append(". Sample usage:\nTo transfer from srchost to desthost:\n\t"); usage.append("ssh dsthost "); usage.append(GFLAGS_NAMESPACE::ProgramInvocationShortName()); usage.append(" -directory destdir | ssh srchost "); usage.append(GFLAGS_NAMESPACE::ProgramInvocationShortName()); usage.append(" -directory srcdir -"); usage.append( "\nPassing - as the argument to wdt means start the sender and" " read the"); usage.append( "\nconnection URL produced by the receiver, including encryption" " key, from stdin."); usage.append("\nUse --help to see all the options."); GFLAGS_NAMESPACE::SetUsageMessage(usage); GFLAGS_NAMESPACE::gflags_exitfunc = [](int code) { if (code == 0 || FLAGS_help) { // By default gflags exit 1 with --help and 0 for --version (good) // let's also exit(0) for --help to be like most gnu command line exit(0); } // error cases: if (FLAGS_exit_on_bad_flags) { printUsage(); exit(code); } badGflagFound = true; }; GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); if (badGflagFound) { // will only work for receivers WLOG(ERROR) << "Continuing despite bad flags"; } else { // Only non -flag argument allowed so far is "-" meaning // Read url from stdin and start a sender if (argc > 2 || (argc == 2 && (argv[1][0] != '-' || argv[1][1] != '\0'))) { printUsage(); std::cerr << "Error: argument should be - (to read url from stdin) " << "or no arguments" << std::endl; exit(1); } } signal(SIGPIPE, SIG_IGN); signal(SIGUSR1, sigUSR1Handler); std::string connectUrl; if (!badGflagFound && argc == 2) { std::getline(std::cin, connectUrl); if (connectUrl.empty()) { WLOG(ERROR) << "Sender unable to read connection url from stdin - exiting"; return URI_PARSE_ERROR; } } else { connectUrl = FLAGS_connection_url; } // Might be a sub class (fbonly wdtCmdLine.cpp) Wdt &wdt = WDTCLASS::initializeWdt(FLAGS_app_name); if (FLAGS_print_options) { wdt.printWdtOptions(std::cout); return 0; } WdtOptions &options = wdt.getWdtOptions(); ErrorCode retCode = OK; // Odd ball case of log parsing if (FLAGS_parse_transfer_log) { // Log parsing mode options.enable_download_resumption = true; TransferLogManager transferLogManager(options, FLAGS_directory); transferLogManager.openLog(); bool success = transferLogManager.parseAndPrint(); WLOG_IF(ERROR, !success) << "Transfer log parsing failed"; transferLogManager.closeLog(); return success ? OK : ERROR; } // General case : Sender or Receiver std::unique_ptr<WdtTransferRequest> reqPtr; if (connectUrl.empty()) { reqPtr = std::make_unique<WdtTransferRequest>( options.start_port, options.num_ports, FLAGS_directory); reqPtr->hostName = FLAGS_destination; reqPtr->transferId = FLAGS_transfer_id; if (!FLAGS_test_only_encryption_secret.empty()) { reqPtr->encryptionData = EncryptionParams(parseEncryptionType(options.encryption_type), FLAGS_test_only_encryption_secret); } reqPtr->ivChangeInterval = options.iv_change_interval_mb * kMbToB; reqPtr->tls = wdt.isTlsEnabled(); } else { reqPtr = std::make_unique<WdtTransferRequest>(connectUrl); if (reqPtr->errorCode != OK) { WLOG(ERROR) << "Invalid url \"" << connectUrl << "\" : " << errorCodeToStr(reqPtr->errorCode); return ERROR; } reqPtr->directory = FLAGS_directory; WLOG(INFO) << "Parsed url as " << reqPtr->getLogSafeString(); } WdtTransferRequest &req = *reqPtr; req.wdtNamespace = FLAGS_namespace; if (!FLAGS_dest_id.empty()) { req.destIdentifier = FLAGS_dest_id; } if (FLAGS_protocol_version > 0) { req.protocolVersion = FLAGS_protocol_version; } if (!FLAGS_hostname.empty()) { reqPtr->hostName = FLAGS_hostname; } if (FLAGS_destination.empty() && connectUrl.empty()) { Receiver receiver(req); WdtOptions &recOptions = receiver.getWdtOptions(); if (FLAGS_run_as_daemon) { // Backward compatible with static ports, you can still get dynamic // daemon ports using -start_port 0 like before recOptions.static_ports = true; } if (!FLAGS_recovery_id.empty()) { // TODO: add a test for this recOptions.enable_download_resumption = true; receiver.setRecoveryId(FLAGS_recovery_id); } wdt.wdtSetReceiverSocketCreator(receiver); WdtTransferRequest augmentedReq = receiver.init(); retCode = augmentedReq.errorCode; if (retCode == FEWER_PORTS) { if (FLAGS_treat_fewer_port_as_error) { WLOG(ERROR) << "Receiver could not bind to all the ports"; return FEWER_PORTS; } retCode = OK; } else if (augmentedReq.errorCode != OK) { WLOG(ERROR) << "Error setting up receiver " << errorCodeToStr(retCode); return retCode; } // In the log: WLOG(INFO) << "Starting receiver with connection url " << augmentedReq.getLogSafeString(); // The url without secret // on stdout: the one with secret: std::cout << augmentedReq.genWdtUrlWithSecret() << std::endl; std::cout.flush(); if (FLAGS_fork) { pid_t cpid = fork(); if (cpid == -1) { perror("Failed to fork()"); exit(1); } if (cpid > 0) { WLOG(INFO) << "Detaching receiver"; exit(0); } close(0); close(1); } setAbortChecker(receiver); if (!FLAGS_run_as_daemon) { retCode = receiver.transferAsync(); if (retCode == OK) { std::unique_ptr<TransferReport> report = receiver.finish(); retCode = report->getSummary().getErrorCode(); } } else { retCode = receiver.runForever(); // not reached } } else { // Sender mode if (!FLAGS_manifest.empty()) { // Each line should have the filename and optionally // the filesize separated by a single space if (FLAGS_manifest == "-") { readManifest(std::cin, req, options.odirect_reads); } else { std::ifstream fin(FLAGS_manifest); readManifest(fin, req, options.odirect_reads); fin.close(); } WLOG(INFO) << "Using files lists, number of files " << req.fileInfo.size(); } WLOG(INFO) << "Making Sender with encryption set = " << req.encryptionData.isSet(); retCode = wdt.wdtSend(req, setupAbortChecker()); } cancelAbort(); if (retCode == OK) { WLOG(INFO) << "Returning with OK exit code"; } else { WLOG(ERROR) << "Returning with code " << retCode << " " << errorCodeToStr(retCode); } return retCode; }