int wait_completion()

in rpc/out-of-order-execution.cpp [113:226]


        int wait_completion(OutOfOrderContext& args) //recieving work
        {
            {
                // check if context issued
                SCOPED_LOCK(m_mutex_map);
                if (m_map.find(args.tag) == m_map.end()) {
                    LOG_ERROR_RETURN(EINVAL, -1,
                                        "context not found in map");
                }
            }
            DEFER(m_wait.notify_one());
            {
                SCOPED_LOCK(args.phaselock);
                if (args.phase == OooPhase::BEFORE_ISSUE)
                    LOG_ERROR_RETURN(EINVAL, -1, "context not issued");
                if (args.phase == OooPhase::WAITING)
                    LOG_ERROR_RETURN(EINVAL, -1, "context already in waiting");
                for (bool hold_lock = false; !hold_lock;) {
                    switch (args.phase) {                        
                        case OooPhase::COLLECTED:
                            // result alread collected before wait
                            if (args.th != CURRENT)
                                LOG_ERROR_RETURN(EINVAL, -1, "context is not issued by current thread");
                            return args.ret;
                        case OooPhase::ISSUED:
                            args.th = photon::CURRENT;
                            args.phase = OooPhase::WAITING;
                        case OooPhase::WAITING:
                            {
                                if (m_mutex_r.try_lock() == 0) {
                                    hold_lock = true;
                                    break;
                                }
                                auto ret = m_wait.wait(args.phaselock, args.timeout);
                                // Check if collected
                                if (args.phase == OooPhase::COLLECTED &&
                                    args.th == CURRENT) {
                                    return args.ret;
                                }
                                if (ret == -1) {
                                    // or just timed out
                                    {
                                        SCOPED_LOCK(m_mutex_map);
                                        m_map.erase(args.tag);
                                        m_cond_collected.notify_one();
                                    }
                                    LOG_ERROR_RETURN(ETIMEDOUT, -1, "waiting for completion timeout");
                                }
                                break;
                            }
                        default:
                            LOG_ERROR_RETURN(EINVAL, -1, "unexpected phase");
                    }
                }
            }

            // Holding mutex_r
            // My origin tag is o_tag
            auto o_tag = args.tag;
            DEFER(m_mutex_r.unlock());
            for (;;) {
                int ret = args.do_completion(&args);
                //this do_completion may recieve results for other threads.
                // but still works because even if tag of this issue have a unique do_completion
                // which make other threads never could recieve it's result
                // the thread will waiting till it hold the lock and get it by itself
                // Since thread may not know the result of an issue will recieve by which thread
                // User must make sure that the do_completion can atleast recieve the result of it's own issue.
                OutOfOrderContext* targ = nullptr;
                unordered_map<uint64_t, OutOfOrderContext*>::iterator it;
                {
                    SCOPED_LOCK(m_mutex_map);
                    DEFER(m_cond_collected.notify_one());
                    if (ret < 0) {
                        // set with nullptr means the thread is once issued but failed when wait_completion
                        m_map.erase(o_tag);
                        LOG_ERROR_RETURN(0, -1, "failed to do_completion()");
                    }

                    it = m_map.find(args.tag);

                    if (it == m_map.end()) {
                        // response tag never issued
                        m_map.erase(o_tag);
                        LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag);
                    }
                    targ = it->second;
                    m_map.erase(it);
                }

                // collect with mutex_r
                targ->ret = targ->do_collect(targ);

                {
                    photon::thread *th;
                    {
                        SCOPED_LOCK(targ->phaselock);
                        th = targ->th;
                        targ->phase = OooPhase::COLLECTED;
                    }
                    if (o_tag == args.tag) {
                        if (th != CURRENT) {
                            LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT));
                        }
                        return args.ret;  // it's my result, let's break, and
                                          // collect it
                    }
                    if (!th)
                        // issued but requesting thread just failed in completion when waiting
                        LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!");
                    thread_interrupt(th, EINTR);    // other threads' response, resume him
                }
            }
        }