rpc/out-of-order-execution.h (97 lines of code) (raw):

/* Copyright 2022 The Photon Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ /* This module implements a generic framework that enables concurrent out-of-ordering exeuction of *asynchronous* operations, while providing a simple *synchronous* interface. This framework supports potentially any asynchronous operation, by dividing an operation into 3 parts: issue, wait for completion and collection of result. The first 2 parts are realized via callbacks. */ #pragma once #include <photon/common/callback.h> #include <photon/common/timeout.h> #include <photon/thread/thread.h> namespace photon{ struct thread; namespace rpc { class OutOfOrder_Execution_Engine; OutOfOrder_Execution_Engine* new_ooo_execution_engine(); void delete_ooo_execution_engine(OutOfOrder_Execution_Engine* engine); enum class OooPhase : int { BEFORE_ISSUE = 0, ISSUED = 1, WAITING = 2, COLLECTED = 3 }; struct OutOfOrderContext { OutOfOrder_Execution_Engine* engine; // an unique tag of the opeartion, which can be filled // by user (together with `flag_tag_valid` = true), // by the `engine`, or by `do_completion`. uint64_t tag = 0; // The `CallbackType` have an prototype of // either `int (*)(void*, OutOfOrderContext*)`, // or `int (CLAZZ::*)(OutOfOrderContext*)`. typedef Callback<OutOfOrderContext*> CallbackType; // The callback to issue an asynchronous operation, with // a tag specified in the argument. The tag should be retrieved // when the operation completes. // It's guaranteed not to be called concurrently. CallbackType do_issue; // The callback to do a blocking wait for the completion of any // issued operations, storing its *tag* to the `tag` field of the // provided `OutOfOrderContext` argument. After a successful return, // it's guaranteed not to be called again before `ooo_result_collected()`. // It's guaranteed not to be called concurrently. CallbackType do_completion; // The callback to do a blocking wait for the completion of any // issued operations, storing its *tag* to the `tag` field of the // provided `OutOfOrderContext` argument. After a successful return, // it's guaranteed not to be called again before `ooo_result_collected()`. // It's guaranteed not to be called concurrently. CallbackType do_collect; // thread that binding with this argument thread * th = nullptr; // Timeout for wait Timeout timeout; // Context phase photon::spinlock phaselock; volatile OooPhase phase = OooPhase::BEFORE_ISSUE; // return value of collection int ret = -1; // whether or not the `tag` field is valid bool flag_tag_valid = false; OutOfOrderContext() = default; OutOfOrderContext& operator=(const OutOfOrderContext& rhs) { engine = rhs.engine; tag = rhs.tag; do_issue = rhs.do_issue; do_completion = rhs.do_completion; do_collect = rhs.do_collect; th = rhs.th; timeout = rhs.timeout; flag_tag_valid = rhs.flag_tag_valid; phase = rhs.phase; ret = rhs.ret; return *this; } }; // Issue an asynchronous operation, // storing it's *tag* to args if (!args.flag_tag_valid). // return 0 for success, negative for failure // Arguments: engine, do_issue, [tag, flag_tag_valid] extern "C" int ooo_issue_operation(OutOfOrderContext& args); // Wait for the completion of the operation. // returns 0 for success, negative for failures // if returns -2 and errno == ENOENT, there is a completed // operation but there is no caller in the registry to // collect the result, so users have to fix it up. // Arguments: engine, do_issue, [tag, flag_tag_valid], do_completion extern "C" int ooo_wait_completion(OutOfOrderContext& args); // Issue and operation and wait for its completion. // Return values are defined the same as above // Arguments: engine, do_completion extern "C" int ooo_issue_wait(OutOfOrderContext& args); // Inform the engine that the result has been colleted, // so that the engine can goes on. // Arguments: engine extern "C" void ooo_result_collected(OutOfOrderContext& args); // return concurrent task num of ooo engine extern "C" int ooo_get_queue_count(OutOfOrder_Execution_Engine* engine); // an exmaple on usage of the ooo engine inline void ooo_execution_example() { class Example { public: Example() { m_engine = new_ooo_execution_engine(); } uint64_t OOO_Operation() { OutOfOrderContext args; init_ooo_args(&args); ooo_issue_wait(args); uint64_t ret = this->collect_result(); ooo_result_collected(args); return ret; } ~Example() { delete_ooo_execution_engine(m_engine); } private: OutOfOrder_Execution_Engine* m_engine; void init_ooo_args(OutOfOrderContext* args) { args->engine = m_engine; args->do_issue.bind(this, &Example::issue); args->do_completion.bind(this, &Example::complete); } int issue(OutOfOrderContext* args) { // issue the async operation with a tag of `args->tag` return 0; } int complete(OutOfOrderContext* args) { // wait for a result of any issued async operation, // and store its tag to `args->tag`. return 0; } uint64_t collect_result() { // collect and return the result return UINT64_MAX; } }; Example().OOO_Operation(); } } }