cppcache/integration-test/fw_dunit.cpp (628 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifdef USE_SMARTHEAP #include <smrtheap.h> #endif #include <string> #include <iostream> #include <iomanip> #include <list> #include <map> #include <boost/process.hpp> #include <boost/program_options.hpp> #include <boost/interprocess/mapped_region.hpp> #ifdef _WIN32 #include <boost/interprocess/windows_shared_memory.hpp> #else #include <boost/interprocess/shared_memory_object.hpp> #endif #include "fwklib/FwkException.hpp" #define __DUNIT_NO_MAIN__ #include "fw_dunit.hpp" #include "Utils.hpp" namespace bp = boost::process; namespace bip = boost::interprocess; namespace bpo = boost::program_options; static std::string g_programName; static uint32_t g_coordinatorPid = 0; ClientCleanup gClientCleanup; namespace dunit { void setupCRTOutput() { #ifdef _WIN32 #ifdef DEBUG int reportMode = _CRTDBG_MODE_FILE | _CRTDBG_MODE_WNDW; _CrtSetReportMode(_CRT_ASSERT, reportMode); _CrtSetReportFile(_CRT_ASSERT, _CRTDBG_FILE_STDERR); _CrtSetReportMode(_CRT_ERROR, reportMode); _CrtSetReportFile(_CRT_ERROR, _CRTDBG_FILE_STDERR); _CrtSetReportMode(_CRT_WARN, reportMode); _CrtSetReportFile(_CRT_WARN, _CRTDBG_FILE_STDERR); SetErrorMode(SEM_FAILCRITICALERRORS); #endif #endif } // some common values.. #define WORKER_STATE_READY 1 #define WORKER_STATE_DONE 2 #define WORKER_STATE_TASK_ACTIVE 3 #define WORKER_STATE_TASK_COMPLETE 4 #define WORKER_STATE_SCHEDULED 5 void log(std::string s, int lineno, const char *filename); /** uniquely represent each different worker. */ class WorkerId { private: uint32_t m_id; static const char *m_idNames[]; public: explicit WorkerId(uint32_t id) { m_id = id; } int getId() const { return m_id; } const char *getIdName() { return m_idNames[m_id]; } /** return the system id for this process */ int getSystem() { return 1; } /** return the process id for this system. */ int getProcOnSys() { return ((m_id % 2) == 0) ? 2 : 1; } }; const char *WorkerId::m_idNames[] = {"none", "s1p1", "s1p2", "s2p1", "s2p2"}; /** method for letting Task discover its name through RTTI. */ std::string Task::typeName() { return std::string(typeid(*this).name()); } typedef std::list<Task *> TaskList; /** contains a queue of Task* for each WorkerId. */ class TaskQueues { private: std::map<int, TaskList> m_qmap; std::list<int> m_schedule; TaskQueues() : m_qmap(), m_schedule() {} void registerTask(WorkerId sId, Task *task) { m_qmap[sId.getId()].push_back(task); m_schedule.push_back(sId.getId()); } Task *nextTask(WorkerId &sId) { TaskList *tasks = &(m_qmap[sId.getId()]); if (tasks->empty()) { return nullptr; } Task *task = tasks->front(); if (task != nullptr) { LOG(std::string("receieved task: ") + task->m_taskName); tasks->pop_front(); } return task; } int nextWorkerId() { if (m_schedule.empty()) { return 0; } int sId = m_schedule.front(); LOGCOORDINATOR(std::string("Next worker id id : ") + std::to_string(sId)); m_schedule.pop_front(); return sId; } static TaskQueues *taskQueues; public: static void addTask(WorkerId sId, Task *task) { if (taskQueues == nullptr) { taskQueues = new TaskQueues(); } taskQueues->registerTask(sId, task); } static int getWorkerId() { ASSERT(taskQueues != nullptr, "failure to initialize fw_dunit module."); return taskQueues->nextWorkerId(); } static Task *getTask(WorkerId sId) { ASSERT(taskQueues != nullptr, "failure to initialize fw_dunit module."); return taskQueues->nextTask(sId); } }; TaskQueues *TaskQueues::taskQueues = nullptr; /** register task with worker. */ void Task::init(int sId) { init(sId, false); } void Task::init(int sId, bool isHeapAllocated) { m_isHeapAllocated = isHeapAllocated; m_id = sId; m_taskName = this->typeName(); TaskQueues::addTask(WorkerId(sId), this); } class TestState { public: static const auto WORKER_COUNT = 4U; void reset(); void setWorkerTimeout(int id, int seconds); int getWorkerTimeout(int id) const; void setWorkerState(int id, uint8_t state); int getWorkerState(int id) const; void setNextWorker(int id); int getNextWorker(); void fail(); bool failed() const; void terminate(); bool terminated() const; private: bool failure_; bool terminate_; int next_worker_; int worker_timeout_[WORKER_COUNT]; uint8_t worker_state_[WORKER_COUNT]; }; /** main framework entry */ class Dunit { private: static const auto MANAGED_STATE_SIZE = 1UL << 17UL; static Dunit *singleton; bool coordinator_; bip::mapped_region globals_region_; #ifdef _WIN32 bip::windows_shared_memory globals_shm_; #else bip::shared_memory_object globals_shm_; #endif bip::managed_shared_memory managed_state_; explicit Dunit(bool coordinator) : coordinator_(coordinator) { if (coordinator) { removeStates(); #ifdef _WIN32 globals_shm_ = bip::windows_shared_memory{bip::create_only, getSharedName(), bip::read_write, sizeof(TestState)}; #else globals_shm_ = bip::shared_memory_object{ bip::create_only, getSharedName(), bip::read_write}; globals_shm_.truncate(sizeof(TestState)); #endif managed_state_ = bip::managed_shared_memory{ bip::create_only, getManagedStateName(), MANAGED_STATE_SIZE}; } else { using shared_memory = #ifdef _WIN32 bip::windows_shared_memory; #else bip::shared_memory_object; #endif globals_shm_ = shared_memory{bip::open_only, getSharedName(), bip::read_write}; managed_state_ = bip::managed_shared_memory{bip::open_only, getManagedStateName()}; } globals_region_ = bip::mapped_region{globals_shm_, bip::read_write}; if (coordinator) { getState()->reset(); } } ~Dunit() { if (coordinator_) { removeStates(); } } static void removeStates() { bip::shared_memory_object::remove(getSharedName()); bip::shared_memory_object::remove(getManagedStateName()); } static const char *getSharedName() { static std::string name = std::string{std::getenv("TESTNAME")} + '.' + std::to_string(g_coordinatorPid); return name.c_str(); } static const char *getManagedStateName() { static std::string name = std::string{std::getenv("TESTNAME")} + ".managed." + std::to_string(g_coordinatorPid); return name.c_str(); } public: /** call this once just inside main... */ static void init(bool coordinator) { singleton = new Dunit(coordinator); } /** return the already initialized singleton Dunit instance. */ static Dunit *getSingleton() { ASSERT(singleton != nullptr, "singleton not created yet."); return singleton; } /** delete the existing singleton */ static void close() { Dunit *tmp = singleton; singleton = nullptr; delete tmp; } TestState *getState() { return reinterpret_cast<TestState *>(globals_region_.get_address()); } bip::managed_shared_memory &getManagedState() { return managed_state_; } }; #define DUNIT dunit::Dunit::getSingleton() Dunit *Dunit::singleton = nullptr; void TestState::reset() { next_worker_ = 0; failure_ = false; terminate_ = false; for (auto i = 0U; i < WORKER_COUNT; ++i) { worker_state_[i] = 0; worker_timeout_[i] = -1; } } void TestState::setWorkerTimeout(int id, int seconds) { worker_timeout_[id - 1] = seconds; } int TestState::getWorkerTimeout(int id) const { return worker_timeout_[id - 1]; } void TestState::setWorkerState(int id, uint8_t state) { worker_state_[id - 1] = state; } int TestState::getWorkerState(int id) const { return worker_state_[id - 1]; } void TestState::setNextWorker(int id) { next_worker_ = id; } int TestState::getNextWorker() { return next_worker_; } void TestState::fail() { failure_ = true; } bool TestState::failed() const { return failure_; } void TestState::terminate() { terminate_ = true; } bool TestState::terminated() const { return terminate_; } void Task::setTimeout(int seconds) { auto state = DUNIT->getState(); if (seconds > 0) { state->setWorkerTimeout(m_id, seconds); } else { state->setWorkerTimeout(m_id, TASK_TIMEOUT); } } class TestProcess { public: TestProcess(const std::string &cmdline, uint32_t id) : id_{id}, running_{false}, cmd_{cmdline} {} WorkerId &getWorkerId() { return id_; } void run() { auto arguments = bpo::split_unix(cmd_); std::string exe = arguments[0]; arguments.erase(arguments.begin()); process_ = bp::child(exe, bp::args = arguments); process_.wait(); if (process_.exit_code() != 0) { std::clog << "Worker " << id_.getIdName() << " exited with code " << process_.exit_code() << std::endl; } running_ = false; } void start() { running_ = true; thread_ = std::thread{[this]() { run(); }}; } void stop() { if (thread_.joinable()) { thread_.join(); } } bool running() const { return running_; } protected: WorkerId id_; bool running_; std::string cmd_; bp::child process_; std::thread thread_; }; /** * Container of TestProcess(es) held in driver. each represents one of the * legal WorkerIds spawned when TestDriver is created. */ class TestDriver { private: TestProcess *m_workers[4]; public: TestDriver() { dunit::Dunit::init(true); std::cout << "Coordinator starting workers.\n"; for (uint32_t i = 1; i < 5; i++) { std::string cmdline; cmdline = g_programName + " -s" + std::to_string(i) + " -m" + std::to_string(g_coordinatorPid); std::cout << cmdline.c_str() << "\n"; m_workers[i - 1] = new TestProcess(cmdline, i); } std::cout << std::flush; // start each of the workers... for (uint32_t j = 1; j < 5; j++) { m_workers[j - 1]->start(); std::this_thread::sleep_for( std::chrono::seconds(2)); // do not increase this to avoid precheckin // runs taking much longer. } } ~TestDriver() { for (uint32_t i = 0; i < TestState::WORKER_COUNT;) { auto worker = m_workers[i++]; worker->stop(); delete worker; } dunit::Dunit::close(); } int begin() { std::cout << "Coordinator started with pid " << boost::this_process::get_id() << "\n" << std::flush; waitForReady(); // dispatch task... int nextWorker; auto state = DUNIT->getState(); while ((nextWorker = TaskQueues::getWorkerId()) != 0) { WorkerId sId(nextWorker); state->setWorkerState(nextWorker, WORKER_STATE_SCHEDULED); std::cout << "Set next process to " << sId.getIdName() << "\n" << std::flush; state->setNextWorker(nextWorker); waitForCompletion(sId); // check special conditions. if (state->failed()) { state->terminate(); waitForDone(); return 1; } } // end all work.. state->terminate(); waitForDone(); return 0; } /** wait for an individual worker to finish a task. */ void waitForCompletion(WorkerId &sId) { auto id = sId.getId(); auto state = DUNIT->getState(); int secs = state->getWorkerTimeout(id); state->setWorkerTimeout(id, TASK_TIMEOUT); if (secs <= 0) { secs = TASK_TIMEOUT; } std::cout << "Waiting " << secs << " seconds for " << sId.getIdName() << " to finish task.\n" << std::flush; auto end = std::chrono::steady_clock::now() + std::chrono::seconds{secs}; while (state->getWorkerState(id) != WORKER_STATE_TASK_COMPLETE) { // sleep a bit.. if (state->failed()) { return; } std::this_thread::sleep_for(std::chrono::milliseconds{100}); checkWorkerDeath(); auto now = std::chrono::steady_clock::now(); if (now >= end) { handleTimeout(sId); break; } } } void handleTimeout() { std::cout << "Error: Timed out waiting for all workers to be ready.\n" << std::flush; auto state = DUNIT->getState(); state->terminate(); state->fail(); } void handleTimeout(WorkerId &sId) { std::cout << "Error: Timed out waiting for " << sId.getIdName() << " to finish task.\n" << std::flush; auto state = DUNIT->getState(); state->terminate(); state->fail(); } /** wait for all workers * to be done initializing. */ void waitForReady() { auto state = DUNIT->getState(); std::cout << "Waiting " << TASK_TIMEOUT << " seconds for all workers to be ready.\n" << std::flush; auto end = std::chrono::steady_clock::now() + std::chrono::seconds{TASK_TIMEOUT}; uint32_t readyCount = 0; while (readyCount < TestState::WORKER_COUNT) { std::cout << "Ready Count: " << readyCount << "\n" << std::flush; if (state->failed()) { return; } std::this_thread::sleep_for(std::chrono::seconds{1}); readyCount = 0; for (uint32_t i = 1; i < 5; i++) { int status = state->getWorkerState(i); if (status == WORKER_STATE_READY) { ++readyCount; } } checkWorkerDeath(); auto now = std::chrono::steady_clock::now(); if (now >= end) { handleTimeout(); break; } } } /** wait for all workers to be destroyed. */ void waitForDone() { auto state = DUNIT->getState(); std::cout << "Waiting " << TASK_TIMEOUT << " seconds for all workers to complete.\n" << std::flush; uint32_t doneCount = 0; auto end = std::chrono::steady_clock::now() + std::chrono::seconds{TASK_TIMEOUT}; while (doneCount < TestState::WORKER_COUNT) { // if ( DUNIT->getFailed() ) return; // sleep a bit.. std::this_thread::sleep_for(std::chrono::milliseconds{100}); doneCount = 0; for (uint32_t i = 1; i < 5; i++) { int status = state->getWorkerState(i); if (status == WORKER_STATE_DONE) { ++doneCount; } } auto now = std::chrono::steady_clock::now(); if (now >= end) { handleTimeout(); break; } } } /** test to see that all the worker processes are still around, or throw a TestException so the driver doesn't get hung. */ void checkWorkerDeath() { auto state = DUNIT->getState(); for (uint32_t i = 0; i < TestState::WORKER_COUNT; i++) { if (!m_workers[i]->running()) { auto msg = std::string("Error: Worker ") + m_workers[i]->getWorkerId().getIdName() + " terminated prematurely"; LOG(msg); state->fail(); state->terminate(); FAIL(msg); } } } }; class TestWorker { private: WorkerId m_sId; public: static WorkerId *procWorkerId; explicit TestWorker(int id) : m_sId(id) { procWorkerId = new WorkerId(id); dunit::Dunit::init(false); DUNIT->getState()->setWorkerState(m_sId.getId(), WORKER_STATE_READY); std::clog << "Started worker " << id << std::endl; } ~TestWorker() { DUNIT->getState()->setWorkerState(m_sId.getId(), WORKER_STATE_DONE); dunit::Dunit::close(); } void begin() { auto state = DUNIT->getState(); std::cout << "Worker " << m_sId.getIdName() << " started with pid " << boost::this_process::get_id() << "\n" << std::flush; // consume tasks of this workers queue, only when it is his turn.. while (!state->terminated()) { if (state->getNextWorker() == m_sId.getId()) { // set next worker to zero so I don't accidently run twice. state->setNextWorker(0); // do next task... Task *task = TaskQueues::getTask(m_sId); // perform task. if (task != nullptr) { state->setWorkerState(m_sId.getId(), WORKER_STATE_TASK_ACTIVE); try { task->doTask(); if (task->m_isHeapAllocated) { delete task; } fflush(stdout); state->setWorkerState(m_sId.getId(), WORKER_STATE_TASK_COMPLETE); } catch (TestException te) { if (task->m_isHeapAllocated) { delete task; } te.print(); handleError(); return; } catch (...) { if (task->m_isHeapAllocated) { delete task; } LOG("Unhandled exception, terminating."); handleError(); return; } } } std::this_thread::sleep_for(std::chrono::milliseconds{100}); } } void handleError() { auto state = DUNIT->getState(); state->fail(); state->terminate(); state->setWorkerState(m_sId.getId(), WORKER_STATE_TASK_COMPLETE); } }; WorkerId *TestWorker::procWorkerId = nullptr; void sleep(int millis) { if (millis == 0) { std::this_thread::yield(); } else { std::this_thread::sleep_for(std::chrono::milliseconds{millis}); } } void logCoordinator(std::string s, int lineno, const char * /*filename*/) { using std::chrono::duration_cast; using std::chrono::microseconds; using std::chrono::system_clock; auto now = system_clock::now(); auto in_time_t = system_clock::to_time_t(now); auto localtime = std::localtime(&in_time_t); auto usec = duration_cast<microseconds>(now.time_since_epoch()).count() % 1000; std::cout << "[TEST " << std::put_time(localtime, "%Y/%m/%d %H:%M:%S") << '.' << std::setfill('0') << std::setw(6) << usec << std::setw(0) << " coordinator:pid(" << boost::this_process::get_id() << ")] " << s << " at line: " << lineno << std::endl << std::flush; } // log a message and print the worker id as well.. used by fw_helper with no // worker id. void log(std::string s, int lineno, const char * /*filename*/, int /*id*/) { using std::chrono::duration_cast; using std::chrono::microseconds; using std::chrono::system_clock; auto now = system_clock::now(); auto in_time_t = system_clock::to_time_t(now); auto localtime = std::localtime(&in_time_t); auto usec = duration_cast<microseconds>(now.time_since_epoch()).count() % 1000; std::cout << "[TEST " << std::put_time(localtime, "%Y/%m/%d %H:%M:%S") << '.' << std::setfill('0') << std::setw(6) << usec << std::setw(0) << " 0:pid(" << boost::this_process::get_id() << ")] " << s << " at line: " << lineno << std::endl << std::flush; } // log a message and print the worker id as well.. void log(std::string s, int lineno, const char * /*filename*/) { using std::chrono::duration_cast; using std::chrono::microseconds; using std::chrono::system_clock; auto now = system_clock::now(); auto in_time_t = system_clock::to_time_t(now); auto localtime = std::localtime(&in_time_t); auto usec = duration_cast<microseconds>(now.time_since_epoch()).count() % 1000; std::cout << "[TEST " << std::put_time(localtime, "%Y/%m/%d %H:%M:%S") << '.' << std::setfill('0') << std::setw(6) << usec << std::setw(0) << ' ' << (dunit::TestWorker::procWorkerId ? dunit::TestWorker::procWorkerId->getIdName() : "coordinator") << ":pid(" << boost::this_process::get_id() << ")] " << s << " at line: " << lineno << std::endl << std::flush; } int dmain(int argc, char *argv[]) { using apache::geode::client::Utils; #ifdef USE_SMARTHEAP MemRegisterTask(); #endif setupCRTOutput(); auto timebomb = std::chrono::seconds{std::stoi(Utils::getEnv("TIMEBOMB"))}; TimeBomb tb(timebomb, []() { gClientCleanup.trigger(); }); tb.arm(); g_programName = argv[0]; bpo::options_description generic("Options"); auto &&options = generic.add_options(); options("worker,s", bpo::value<int>(), "Set worker ID"); options("coordinator,m", bpo::value<int>(), "Set coordinator PID"); options("help", "Shows this help"); bpo::variables_map vm; bpo::store(bpo::parse_command_line(argc, argv, generic), vm); bpo::notify(vm); int result = 0; int workerId = 0; auto iter = vm.find("worker"); if (iter != vm.end()) { workerId = iter->second.as<int>(); } iter = vm.find("coordinator"); if (iter != vm.end()) { g_coordinatorPid = iter->second.as<int>(); } else { g_coordinatorPid = boost::this_process::get_id(); } try { if (workerId > 0) { dunit::TestWorker worker(workerId); worker.begin(); } else { dunit::TestDriver tdriver; result = tdriver.begin(); if (result == 0) { std::cout << "#### All Tasks completed successfully. ####\n"; } else { std::cout << "#### FAILED. ####\n"; } fflush(stdout); } std::cout << "final worker id " << workerId << ", result " << result << "\n"; std::cout << "before calling cleanup " << workerId << "\n"; gClientCleanup.trigger(); std::cout << "after calling cleanup\n"; return result; } catch (dunit::TestException &te) { te.print(); } catch (apache::geode::client::testframework::FwkException &fe) { std::cout << "Exception: " << fe.what() << "\n" << std::flush; } catch (std::exception &ex) { std::cout << "Exception: system exception reached main: " << ex.what() << ".\n" << std::flush; } catch (...) { std::cout << "Exception: unhandled/unidentified exception reached main.\n" << std::flush; } gClientCleanup.trigger(); return 1; } /** entry point for test code modules to access the naming service. */ bip::managed_shared_memory &globals() { return DUNIT->getManagedState(); } } // namespace dunit