in rpc/out-of-order-execution.cpp [113:226]
int wait_completion(OutOfOrderContext& args) //recieving work
{
{
// check if context issued
SCOPED_LOCK(m_mutex_map);
if (m_map.find(args.tag) == m_map.end()) {
LOG_ERROR_RETURN(EINVAL, -1,
"context not found in map");
}
}
DEFER(m_wait.notify_one());
{
SCOPED_LOCK(args.phaselock);
if (args.phase == OooPhase::BEFORE_ISSUE)
LOG_ERROR_RETURN(EINVAL, -1, "context not issued");
if (args.phase == OooPhase::WAITING)
LOG_ERROR_RETURN(EINVAL, -1, "context already in waiting");
for (bool hold_lock = false; !hold_lock;) {
switch (args.phase) {
case OooPhase::COLLECTED:
// result alread collected before wait
if (args.th != CURRENT)
LOG_ERROR_RETURN(EINVAL, -1, "context is not issued by current thread");
return args.ret;
case OooPhase::ISSUED:
args.th = photon::CURRENT;
args.phase = OooPhase::WAITING;
case OooPhase::WAITING:
{
if (m_mutex_r.try_lock() == 0) {
hold_lock = true;
break;
}
auto ret = m_wait.wait(args.phaselock, args.timeout);
// Check if collected
if (args.phase == OooPhase::COLLECTED &&
args.th == CURRENT) {
return args.ret;
}
if (ret == -1) {
// or just timed out
{
SCOPED_LOCK(m_mutex_map);
m_map.erase(args.tag);
m_cond_collected.notify_one();
}
LOG_ERROR_RETURN(ETIMEDOUT, -1, "waiting for completion timeout");
}
break;
}
default:
LOG_ERROR_RETURN(EINVAL, -1, "unexpected phase");
}
}
}
// Holding mutex_r
// My origin tag is o_tag
auto o_tag = args.tag;
DEFER(m_mutex_r.unlock());
for (;;) {
int ret = args.do_completion(&args);
//this do_completion may recieve results for other threads.
// but still works because even if tag of this issue have a unique do_completion
// which make other threads never could recieve it's result
// the thread will waiting till it hold the lock and get it by itself
// Since thread may not know the result of an issue will recieve by which thread
// User must make sure that the do_completion can atleast recieve the result of it's own issue.
OutOfOrderContext* targ = nullptr;
unordered_map<uint64_t, OutOfOrderContext*>::iterator it;
{
SCOPED_LOCK(m_mutex_map);
DEFER(m_cond_collected.notify_one());
if (ret < 0) {
// set with nullptr means the thread is once issued but failed when wait_completion
m_map.erase(o_tag);
LOG_ERROR_RETURN(0, -1, "failed to do_completion()");
}
it = m_map.find(args.tag);
if (it == m_map.end()) {
// response tag never issued
m_map.erase(o_tag);
LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag);
}
targ = it->second;
m_map.erase(it);
}
// collect with mutex_r
targ->ret = targ->do_collect(targ);
{
photon::thread *th;
{
SCOPED_LOCK(targ->phaselock);
th = targ->th;
targ->phase = OooPhase::COLLECTED;
}
if (o_tag == args.tag) {
if (th != CURRENT) {
LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT));
}
return args.ret; // it's my result, let's break, and
// collect it
}
if (!th)
// issued but requesting thread just failed in completion when waiting
LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!");
thread_interrupt(th, EINTR); // other threads' response, resume him
}
}
}