int CompactCache::callBucketFn()

in cachelib/compact_cache/CCache-inl.h [431:514]


int CompactCache<C, A, B>::callBucketFn(
    const Key& key,
    Operation op,
    const std::chrono::microseconds& timeout,
    Fn f,
    Args... args) {
  if (numChunks_ == 0) {
    return -1;
  }

  /* 1) Increase the refcount of the current cohort. */
  Cohort::Token tok = cohort_.incrActiveReqs();

  /* 2) Find the hash table bucket for the key. */
  Bucket* bucket = tableFindBucket(key);

  /* 3) Lock the bucket. Immutable bucket is a parameter
   * regarding whether we're allowed to modify the bucket in any way,
   * meaning we take an exclusive lock, or not, meaning we take a
   * shared lock for reads without promotion. We may need to promote
   * in a second pass in the latter case. */
  BucketReturn rv;
  bool immutable_bucket = (op == Operation::READ);

  /* 4) Call the request handler. */
  if (immutable_bucket) {
    auto lock = locks_.lockShared(timeout, bucket);
    if (!lock.locked()) {
      XDCHECK(timeout > std::chrono::microseconds::zero());
      ++stats_.tlStats().lockTimeout;
      return -2;
    }

    rv = (this->*f)(bucket, key, args...);
  } else {
    auto lock = locks_.lockExclusive(timeout, bucket);
    if (!lock.locked()) {
      XDCHECK(timeout > std::chrono::microseconds::zero());
      ++stats_.tlStats().lockTimeout;
      return -2;
    }

    rv = (this->*f)(bucket, key, args...);
  }

  /* 5.5) Promote if necessary from a read operation */
  if (UNLIKELY(rv == BucketReturn::PROMOTE)) {
    XDCHECK(immutable_bucket);
    XDCHECK_EQ(op, Operation::READ);

    rv = BucketReturn::FOUND;

    if (allowPromotions_) {
      auto lock = locks_.lockExclusive(timeout, bucket);
      if (!lock.locked()) {
        XDCHECK(timeout > std::chrono::microseconds::zero());
        ++stats_.tlStats().promoteTimeout;
      } else {
        this->bucketPromote(bucket, key);
      }
    }
  }
  XDCHECK(rv != BucketReturn::PROMOTE);

  /* 6) Do the operation on the new location if necessary
   * Note that although we release the old lock, the order is very
   * important. The rehasher will be doing lock old -> read old ->
   * lock new -> write new -> unlock both
   * So, we need to make sure that our old update either happens
   * before the rehasher reads, or if it happens after then the new update
   * also happens after. IE new update has to be second.
   */
  Bucket* bucketDbl =
      (op == Operation::WRITE) ? tableFindDblWriteBucket(key) : nullptr;

  if (bucketDbl != nullptr) {
    auto lockDbl = locks_.lockExclusive(bucketDbl);
    if ((this->*f)(bucketDbl, key, args...) == BucketReturn::ERROR) {
      rv = BucketReturn::ERROR;
    }
  }
  XDCHECK_NE(toInt(rv), 2);
  return toInt(rv);
}