Status MemcachedFileBlockCache::Read()

in tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc [531:676]


Status MemcachedFileBlockCache::Read(const string& filename, size_t offset,
                                     size_t n, char* buffer,
                                     size_t* bytes_transferred) {
  *bytes_transferred = 0;
  if (n == 0) {
    return Status::OK();
  }
  VLOG(2) << "original read: offset=" << offset << ", n=" << n
          << ", filename=" << filename;

  if (!IsCacheEnabled() || !configured_) {
    // The cache is effectively disabled, so we pass the read through to the
    // fetcher without breaking it up into blocks.
    return block_fetcher_(filename, offset, n, buffer, bytes_transferred);
  }
  auto start_time = absl::Now();
  BufferCollator collator(offset, n, buffer, block_size_);
  collator.prepare_collation();
  std::vector<string> keys;
  std::map<string, Key> claim_checks;
  for (const auto pos : collator.positions()) {
    const Key key = std::make_pair(filename, pos);
    const string memc_key = MakeMemcachedKey(key);
    keys.push_back(memc_key);
    claim_checks.insert(std::make_pair(memc_key, key));
  }

  string mini_read_key = keys[0];
  bool mini_read = n < block_size_;

  if (mini_read) {
    // Small reads get cached locally since we need to fetch an entire block
    // remotely from either GCS or the distributed cache.
    int64 block_offset = offset - offset % block_size_;
    const Key key = std::make_pair(filename, block_offset);
    mini_read_key = MakeMemcachedKey(key);
    int64 offset_in_block = offset - block_offset;
    if (!local_cache_->Peek(mini_read_key)) {
      local_cache_->Fetching(mini_read_key);
    }
    if (local_cache_->Get(mini_read_key, offset_in_block, n, buffer,
                          bytes_transferred)) {
      return Status::OK();
    }
  }

  size_t total_bytes_transferred = 0;
  bool multi_get = use_multi_get_ && !mini_read;

  if (multi_get) {
    int64 client_index = 0;
    {
      mutex_lock lock(get_mu_);

      if (!client_queue_.empty()) {
        client_index = client_queue_.front();
        client_queue_.pop_front();
      } else {
        LOG(WARNING) << "Memcached client pool is oversaturated. Read will "
                        "skip the block cache.";
      }
    }

    if (client_index > 0) {
      auto before = absl::Now();
      Status mget_status = read_with_multi_get(
          collator, memcached_clients_[client_index], keys, &claim_checks,
          &total_bytes_transferred, cache_stats_);
      auto after = absl::Now();
      VLOG(2) << "memc mget: " << (after - before) << ", status "
              << mget_status;
      mutex_lock lock(get_mu_);
      client_queue_.push_back(client_index);
    }
  }

  // At this point, any claims remaining in claim_checks were not retrievable
  // via multi-get, meaning that they were cache misses.  Each of these should
  // be retried serially, dispatching GCS fetches and setting blocks in the
  // cache.
  VLOG(2) << "Serial fetch of " << claim_checks.size() << " claims";
  // When the cache is completely cold for this region of the GCS file, every
  // one of the requests will be a miss. Filling these requests in random
  // offset order is likely less efficient for GCS than filling them in
  // sequence.  Order the claims by offset.
  std::map<size_t, Key> sorted_claims;
  for (auto ci = claim_checks.begin(); ci != claim_checks.end(); ++ci) {
    sorted_claims.insert(std::make_pair(ci->second.second, ci->second));
  }
  for (auto sc = sorted_claims.begin(); sc != sorted_claims.end(); ++sc) {
    size_t pos = sc->first;
    std::vector<char> data;

    int64 client_index = 0;
    if (!multi_get) {
      mutex_lock lock(get_mu_);
      // Get a client ticket from the pool if available.
      if (!client_queue_.empty()) {
        client_index = client_queue_.front();
        client_queue_.pop_front();
      } else {
        LOG(WARNING) << "Memcached client pool is oversaturated. Read will "
                        "skip the block cache.";
      }
    }

    TF_RETURN_IF_ERROR(MaybeFetch(client_index, sc->second, &data));

    if (client_index > 0) {
      mutex_lock lock(get_mu_);
      // Put client ticket back in the pool.
      client_queue_.push_back(client_index);
    }

    // Copy the relevant portion of the block into the result buffer.
    if (offset >= pos + data.size()) {
      // The requested offset is at or beyond the end of the file. This can
      // happen if `offset` is not block-aligned, and the read returns the
      // last block in the file, which does not extend all the way out to
      // `offset`.
      *bytes_transferred = total_bytes_transferred;
      return errors::OutOfRange("EOF at offset ", offset, " in file ", filename,
                                " at position ", pos, "with data size ",
                                data.size());
    }

    if (mini_read) {
      // Add the fetched block to the local cache when serving small read.
      local_cache_->Add(mini_read_key, data.size(), data.data());
      local_cache_->Fetched(mini_read_key);
    }

    if (!collator.splice_buffer(data.begin(), data.end(), pos,
                                &total_bytes_transferred)) {
      break;
    }
  }
  auto finish_time = absl::Now();
  auto elapsed = finish_time - start_time;
  VLOG(2) << "total_bytes_transferred out " << total_bytes_transferred
          << "; rate "
          << (total_bytes_transferred / absl::ToDoubleSeconds(elapsed))
          << " bytes / second";
  *bytes_transferred = total_bytes_transferred;
  return Status::OK();
}