in tensorflow_io/core/kernels/gsmemcachedfs/memcached_file_block_cache.cc [531:676]
Status MemcachedFileBlockCache::Read(const string& filename, size_t offset,
size_t n, char* buffer,
size_t* bytes_transferred) {
*bytes_transferred = 0;
if (n == 0) {
return Status::OK();
}
VLOG(2) << "original read: offset=" << offset << ", n=" << n
<< ", filename=" << filename;
if (!IsCacheEnabled() || !configured_) {
// The cache is effectively disabled, so we pass the read through to the
// fetcher without breaking it up into blocks.
return block_fetcher_(filename, offset, n, buffer, bytes_transferred);
}
auto start_time = absl::Now();
BufferCollator collator(offset, n, buffer, block_size_);
collator.prepare_collation();
std::vector<string> keys;
std::map<string, Key> claim_checks;
for (const auto pos : collator.positions()) {
const Key key = std::make_pair(filename, pos);
const string memc_key = MakeMemcachedKey(key);
keys.push_back(memc_key);
claim_checks.insert(std::make_pair(memc_key, key));
}
string mini_read_key = keys[0];
bool mini_read = n < block_size_;
if (mini_read) {
// Small reads get cached locally since we need to fetch an entire block
// remotely from either GCS or the distributed cache.
int64 block_offset = offset - offset % block_size_;
const Key key = std::make_pair(filename, block_offset);
mini_read_key = MakeMemcachedKey(key);
int64 offset_in_block = offset - block_offset;
if (!local_cache_->Peek(mini_read_key)) {
local_cache_->Fetching(mini_read_key);
}
if (local_cache_->Get(mini_read_key, offset_in_block, n, buffer,
bytes_transferred)) {
return Status::OK();
}
}
size_t total_bytes_transferred = 0;
bool multi_get = use_multi_get_ && !mini_read;
if (multi_get) {
int64 client_index = 0;
{
mutex_lock lock(get_mu_);
if (!client_queue_.empty()) {
client_index = client_queue_.front();
client_queue_.pop_front();
} else {
LOG(WARNING) << "Memcached client pool is oversaturated. Read will "
"skip the block cache.";
}
}
if (client_index > 0) {
auto before = absl::Now();
Status mget_status = read_with_multi_get(
collator, memcached_clients_[client_index], keys, &claim_checks,
&total_bytes_transferred, cache_stats_);
auto after = absl::Now();
VLOG(2) << "memc mget: " << (after - before) << ", status "
<< mget_status;
mutex_lock lock(get_mu_);
client_queue_.push_back(client_index);
}
}
// At this point, any claims remaining in claim_checks were not retrievable
// via multi-get, meaning that they were cache misses. Each of these should
// be retried serially, dispatching GCS fetches and setting blocks in the
// cache.
VLOG(2) << "Serial fetch of " << claim_checks.size() << " claims";
// When the cache is completely cold for this region of the GCS file, every
// one of the requests will be a miss. Filling these requests in random
// offset order is likely less efficient for GCS than filling them in
// sequence. Order the claims by offset.
std::map<size_t, Key> sorted_claims;
for (auto ci = claim_checks.begin(); ci != claim_checks.end(); ++ci) {
sorted_claims.insert(std::make_pair(ci->second.second, ci->second));
}
for (auto sc = sorted_claims.begin(); sc != sorted_claims.end(); ++sc) {
size_t pos = sc->first;
std::vector<char> data;
int64 client_index = 0;
if (!multi_get) {
mutex_lock lock(get_mu_);
// Get a client ticket from the pool if available.
if (!client_queue_.empty()) {
client_index = client_queue_.front();
client_queue_.pop_front();
} else {
LOG(WARNING) << "Memcached client pool is oversaturated. Read will "
"skip the block cache.";
}
}
TF_RETURN_IF_ERROR(MaybeFetch(client_index, sc->second, &data));
if (client_index > 0) {
mutex_lock lock(get_mu_);
// Put client ticket back in the pool.
client_queue_.push_back(client_index);
}
// Copy the relevant portion of the block into the result buffer.
if (offset >= pos + data.size()) {
// The requested offset is at or beyond the end of the file. This can
// happen if `offset` is not block-aligned, and the read returns the
// last block in the file, which does not extend all the way out to
// `offset`.
*bytes_transferred = total_bytes_transferred;
return errors::OutOfRange("EOF at offset ", offset, " in file ", filename,
" at position ", pos, "with data size ",
data.size());
}
if (mini_read) {
// Add the fetched block to the local cache when serving small read.
local_cache_->Add(mini_read_key, data.size(), data.data());
local_cache_->Fetched(mini_read_key);
}
if (!collator.splice_buffer(data.begin(), data.end(), pos,
&total_bytes_transferred)) {
break;
}
}
auto finish_time = absl::Now();
auto elapsed = finish_time - start_time;
VLOG(2) << "total_bytes_transferred out " << total_bytes_transferred
<< "; rate "
<< (total_bytes_transferred / absl::ToDoubleSeconds(elapsed))
<< " bytes / second";
*bytes_transferred = total_bytes_transferred;
return Status::OK();
}