ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow()

in absl/synchronization/mutex.cc [2035:2334]


ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) {
  SchedulingGuard::ScopedDisable disable_rescheduling;
  intptr_t v = mu_.load(std::memory_order_relaxed);
  this->AssertReaderHeld();
  CheckForMutexCorruption(v, "Unlock");
  if ((v & kMuEvent) != 0) {
    PostSynchEvent(this,
                (v & kMuWriter) != 0? SYNCH_EV_UNLOCK: SYNCH_EV_READERUNLOCK);
  }
  int c = 0;
  // the waiter under consideration to wake, or zero
  PerThreadSynch *w = nullptr;
  // the predecessor to w or zero
  PerThreadSynch *pw = nullptr;
  // head of the list searched previously, or zero
  PerThreadSynch *old_h = nullptr;
  // a condition that's known to be false.
  const Condition *known_false = nullptr;
  PerThreadSynch *wake_list = kPerThreadSynchNull;   // list of threads to wake
  intptr_t wr_wait = 0;        // set to kMuWrWait if we wake a reader and a
                               // later writer could have acquired the lock
                               // (starvation avoidance)
  ABSL_RAW_CHECK(waitp == nullptr || waitp->thread->waitp == nullptr ||
                     waitp->thread->suppress_fatal_errors,
                 "detected illegal recursion into Mutex code");
  // This loop finds threads wake_list to wakeup if any, and removes them from
  // the list of waiters.  In addition, it places waitp.thread on the queue of
  // waiters if waitp is non-zero.
  for (;;) {
    v = mu_.load(std::memory_order_relaxed);
    if ((v & kMuWriter) != 0 && (v & (kMuWait | kMuDesig)) != kMuWait &&
        waitp == nullptr) {
      // fast writer release (writer with no waiters or with designated waker)
      if (mu_.compare_exchange_strong(v, v & ~(kMuWrWait | kMuWriter),
                                      std::memory_order_release,
                                      std::memory_order_relaxed)) {
        return;
      }
    } else if ((v & (kMuReader | kMuWait)) == kMuReader && waitp == nullptr) {
      // fast reader release (reader with no waiters)
      intptr_t clear = ExactlyOneReader(v) ? kMuReader | kMuOne : kMuOne;
      if (mu_.compare_exchange_strong(v, v - clear,
                                      std::memory_order_release,
                                      std::memory_order_relaxed)) {
        return;
      }
    } else if ((v & kMuSpin) == 0 &&  // attempt to get spinlock
               mu_.compare_exchange_strong(v, v | kMuSpin,
                                           std::memory_order_acquire,
                                           std::memory_order_relaxed)) {
      if ((v & kMuWait) == 0) {       // no one to wake
        intptr_t nv;
        bool do_enqueue = true;  // always Enqueue() the first time
        ABSL_RAW_CHECK(waitp != nullptr,
                       "UnlockSlow is confused");  // about to sleep
        do {    // must loop to release spinlock as reader count may change
          v = mu_.load(std::memory_order_relaxed);
          // decrement reader count if there are readers
          intptr_t new_readers = (v >= kMuOne)?  v - kMuOne : v;
          PerThreadSynch *new_h = nullptr;
          if (do_enqueue) {
            // If we are enqueuing on a CondVar (waitp->cv_word != nullptr) then
            // we must not retry here.  The initial attempt will always have
            // succeeded, further attempts would enqueue us against *this due to
            // Fer() handling.
            do_enqueue = (waitp->cv_word == nullptr);
            new_h = Enqueue(nullptr, waitp, new_readers, kMuIsCond);
          }
          intptr_t clear = kMuWrWait | kMuWriter;  // by default clear write bit
          if ((v & kMuWriter) == 0 && ExactlyOneReader(v)) {  // last reader
            clear = kMuWrWait | kMuReader;                    // clear read bit
          }
          nv = (v & kMuLow & ~clear & ~kMuSpin);
          if (new_h != nullptr) {
            nv |= kMuWait | reinterpret_cast<intptr_t>(new_h);
          } else {  // new_h could be nullptr if we queued ourselves on a
                    // CondVar
            // In that case, we must place the reader count back in the mutex
            // word, as Enqueue() did not store it in the new waiter.
            nv |= new_readers & kMuHigh;
          }
          // release spinlock & our lock; retry if reader-count changed
          // (writer count cannot change since we hold lock)
        } while (!mu_.compare_exchange_weak(v, nv,
                                            std::memory_order_release,
                                            std::memory_order_relaxed));
        break;
      }

      // There are waiters.
      // Set h to the head of the circular waiter list.
      PerThreadSynch *h = GetPerThreadSynch(v);
      if ((v & kMuReader) != 0 && (h->readers & kMuHigh) > kMuOne) {
        // a reader but not the last
        h->readers -= kMuOne;  // release our lock
        intptr_t nv = v;       // normally just release spinlock
        if (waitp != nullptr) {  // but waitp!=nullptr => must queue ourselves
          PerThreadSynch *new_h = Enqueue(h, waitp, v, kMuIsCond);
          ABSL_RAW_CHECK(new_h != nullptr,
                         "waiters disappeared during Enqueue()!");
          nv &= kMuLow;
          nv |= kMuWait | reinterpret_cast<intptr_t>(new_h);
        }
        mu_.store(nv, std::memory_order_release);  // release spinlock
        // can release with a store because there were waiters
        break;
      }

      // Either we didn't search before, or we marked the queue
      // as "maybe_unlocking" and no one else should have changed it.
      ABSL_RAW_CHECK(old_h == nullptr || h->maybe_unlocking,
                     "Mutex queue changed beneath us");

      // The lock is becoming free, and there's a waiter
      if (old_h != nullptr &&
          !old_h->may_skip) {                  // we used old_h as a terminator
        old_h->may_skip = true;                // allow old_h to skip once more
        ABSL_RAW_CHECK(old_h->skip == nullptr, "illegal skip from head");
        if (h != old_h && MuEquivalentWaiter(old_h, old_h->next)) {
          old_h->skip = old_h->next;  // old_h not head & can skip to successor
        }
      }
      if (h->next->waitp->how == kExclusive &&
          Condition::GuaranteedEqual(h->next->waitp->cond, nullptr)) {
        // easy case: writer with no condition; no need to search
        pw = h;                       // wake w, the successor of h (=pw)
        w = h->next;
        w->wake = true;
        // We are waking up a writer.  This writer may be racing against
        // an already awake reader for the lock.  We want the
        // writer to usually win this race,
        // because if it doesn't, we can potentially keep taking a reader
        // perpetually and writers will starve.  Worse than
        // that, this can also starve other readers if kMuWrWait gets set
        // later.
        wr_wait = kMuWrWait;
      } else if (w != nullptr && (w->waitp->how == kExclusive || h == old_h)) {
        // we found a waiter w to wake on a previous iteration and either it's
        // a writer, or we've searched the entire list so we have all the
        // readers.
        if (pw == nullptr) {  // if w's predecessor is unknown, it must be h
          pw = h;
        }
      } else {
        // At this point we don't know all the waiters to wake, and the first
        // waiter has a condition or is a reader.  We avoid searching over
        // waiters we've searched on previous iterations by starting at
        // old_h if it's set.  If old_h==h, there's no one to wakeup at all.
        if (old_h == h) {      // we've searched before, and nothing's new
                               // so there's no one to wake.
          intptr_t nv = (v & ~(kMuReader|kMuWriter|kMuWrWait));
          h->readers = 0;
          h->maybe_unlocking = false;   // finished unlocking
          if (waitp != nullptr) {       // we must queue ourselves and sleep
            PerThreadSynch *new_h = Enqueue(h, waitp, v, kMuIsCond);
            nv &= kMuLow;
            if (new_h != nullptr) {
              nv |= kMuWait | reinterpret_cast<intptr_t>(new_h);
            }  // else new_h could be nullptr if we queued ourselves on a
               // CondVar
          }
          // release spinlock & lock
          // can release with a store because there were waiters
          mu_.store(nv, std::memory_order_release);
          break;
        }

        // set up to walk the list
        PerThreadSynch *w_walk;   // current waiter during list walk
        PerThreadSynch *pw_walk;  // previous waiter during list walk
        if (old_h != nullptr) {  // we've searched up to old_h before
          pw_walk = old_h;
          w_walk = old_h->next;
        } else {            // no prior search, start at beginning
          pw_walk =
              nullptr;  // h->next's predecessor may change; don't record it
          w_walk = h->next;
        }

        h->may_skip = false;  // ensure we never skip past h in future searches
                              // even if other waiters are queued after it.
        ABSL_RAW_CHECK(h->skip == nullptr, "illegal skip from head");

        h->maybe_unlocking = true;  // we're about to scan the waiter list
                                    // without the spinlock held.
                                    // Enqueue must be conservative about
                                    // priority queuing.

        // We must release the spinlock to evaluate the conditions.
        mu_.store(v, std::memory_order_release);  // release just spinlock
        // can release with a store because there were waiters

        // h is the last waiter queued, and w_walk the first unsearched waiter.
        // Without the spinlock, the locations mu_ and h->next may now change
        // underneath us, but since we hold the lock itself, the only legal
        // change is to add waiters between h and w_walk.  Therefore, it's safe
        // to walk the path from w_walk to h inclusive. (TryRemove() can remove
        // a waiter anywhere, but it acquires both the spinlock and the Mutex)

        old_h = h;        // remember we searched to here

        // Walk the path upto and including h looking for waiters we can wake.
        while (pw_walk != h) {
          w_walk->wake = false;
          if (w_walk->waitp->cond ==
                  nullptr ||  // no condition => vacuously true OR
              (w_walk->waitp->cond != known_false &&
               // this thread's condition is not known false, AND
               //  is in fact true
               EvalConditionIgnored(this, w_walk->waitp->cond))) {
            if (w == nullptr) {
              w_walk->wake = true;    // can wake this waiter
              w = w_walk;
              pw = pw_walk;
              if (w_walk->waitp->how == kExclusive) {
                wr_wait = kMuWrWait;
                break;                // bail if waking this writer
              }
            } else if (w_walk->waitp->how == kShared) {  // wake if a reader
              w_walk->wake = true;
            } else {   // writer with true condition
              wr_wait = kMuWrWait;
            }
          } else {                  // can't wake; condition false
            known_false = w_walk->waitp->cond;  // remember last false condition
          }
          if (w_walk->wake) {   // we're waking reader w_walk
            pw_walk = w_walk;   // don't skip similar waiters
          } else {              // not waking; skip as much as possible
            pw_walk = Skip(w_walk);
          }
          // If pw_walk == h, then load of pw_walk->next can race with
          // concurrent write in Enqueue(). However, at the same time
          // we do not need to do the load, because we will bail out
          // from the loop anyway.
          if (pw_walk != h) {
            w_walk = pw_walk->next;
          }
        }

        continue;  // restart for(;;)-loop to wakeup w or to find more waiters
      }
      ABSL_RAW_CHECK(pw->next == w, "pw not w's predecessor");
      // The first (and perhaps only) waiter we've chosen to wake is w, whose
      // predecessor is pw.  If w is a reader, we must wake all the other
      // waiters with wake==true as well.  We may also need to queue
      // ourselves if waitp != null.  The spinlock and the lock are still
      // held.

      // This traverses the list in [ pw->next, h ], where h is the head,
      // removing all elements with wake==true and placing them in the
      // singly-linked list wake_list.  Returns the new head.
      h = DequeueAllWakeable(h, pw, &wake_list);

      intptr_t nv = (v & kMuEvent) | kMuDesig;
                                             // assume no waiters left,
                                             // set kMuDesig for INV1a

      if (waitp != nullptr) {  // we must queue ourselves and sleep
        h = Enqueue(h, waitp, v, kMuIsCond);
        // h is new last waiter; could be null if we queued ourselves on a
        // CondVar
      }

      ABSL_RAW_CHECK(wake_list != kPerThreadSynchNull,
                     "unexpected empty wake list");

      if (h != nullptr) {  // there are waiters left
        h->readers = 0;
        h->maybe_unlocking = false;     // finished unlocking
        nv |= wr_wait | kMuWait | reinterpret_cast<intptr_t>(h);
      }

      // release both spinlock & lock
      // can release with a store because there were waiters
      mu_.store(nv, std::memory_order_release);
      break;  // out of for(;;)-loop
    }
    // aggressive here; no one can proceed till we do
    c = synchronization_internal::MutexDelay(c, AGGRESSIVE);
  }                            // end of for(;;)-loop

  if (wake_list != kPerThreadSynchNull) {
    int64_t enqueue_timestamp = wake_list->waitp->contention_start_cycles;
    bool cond_waiter = wake_list->cond_waiter;
    do {
      wake_list = Wakeup(wake_list);              // wake waiters
    } while (wake_list != kPerThreadSynchNull);
    if (!cond_waiter) {
      // Sample lock contention events only if the (first) waiter was trying to
      // acquire the lock, not waiting on a condition variable or Condition.
      int64_t wait_cycles =
          base_internal::CycleClock::Now() - enqueue_timestamp;
      mutex_tracer("slow release", this, wait_cycles);
      ABSL_TSAN_MUTEX_PRE_DIVERT(this, 0);
      submit_profile_data(enqueue_timestamp);
      ABSL_TSAN_MUTEX_POST_DIVERT(this, 0);
    }
  }
}