in src/fdcache_fdinfo.cpp [320:409]
ssize_t PseudoFdInfo::DirectReadAndPrefetch(char* bytes, off_t start, size_t size)
{
S3FS_PRN_DBG("direct read and prefetch[pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", pseudo_fd, physical_fd, static_cast<long long int>(start), size);
ssize_t rsize = 0;
off_t offset = start;
size_t readsize = size;
const off_t chunk_size = DirectReader::GetChunkSize();
const uint32_t max_prefetch_chunks = DirectReader::GetPrefetchChunkCount();
off_t file_size = direct_reader_mgr->GetFileSize();
if(size == 0 || start >= file_size){
return 0;
}
uint32_t chunkid_start = offset / chunk_size;
uint32_t chunkid_end = (offset + readsize - 1) / chunk_size;
for (uint32_t id = chunkid_start; id <= chunkid_end; id++) {
off_t chunk_off = offset % chunk_size;
size_t chunk_len = std::min(readsize, static_cast<size_t>(chunk_size-chunk_off));
// avoid reading the file out of filesize
size_t real_read_size = 0;
if (id * chunk_size + chunk_off >= file_size) {
real_read_size = 0;
} else if(id * chunk_size + chunk_off + static_cast<off_t>(chunk_len) > file_size){
real_read_size = file_size - id * chunk_size - chunk_off;
} else {
real_read_size = chunk_len;
}
AutoLock auto_lock(&direct_reader_mgr->direct_read_lock);
// Release chunks to reduce memory usage
for (auto iter = direct_reader_mgr->chunks.begin(); iter!= direct_reader_mgr->chunks.end(); ) {
// keep the one before the current chunk without releasing it. Because we assume that when
// the read offset is in the previous chunk, it is still read sequentially.
// keep chunks in [id-backward_chunks, id+max_prefetch_chunks]
uint32_t chunkid = iter->first;
if(chunkid + DirectReader::GetBackwardChunks() >= id && chunkid <= id + max_prefetch_chunks){
iter++;
} else {
S3FS_PRN_DBG("release chunk[pseudo_fd=%d][chunkid=%d]", pseudo_fd, chunkid);
delete iter->second;
iter->second = NULL;
iter = direct_reader_mgr->chunks.erase(iter);
}
}
if (direct_reader_mgr->chunks.count(id)) {
S3FS_PRN_DBG("reading from buffer[chunkid=%d][offset=%ld][chunk_off=%ld][real_read_size=%ld]", id, offset, chunk_off, real_read_size);
assert(chunk_off + static_cast<off_t>(real_read_size) <= direct_reader_mgr->chunks[id]->size);
memcpy(bytes, direct_reader_mgr->chunks[id]->buf + chunk_off, real_read_size);
} else {
// if the chunk does not exist, we should download it from oss directly.
S3FS_PRN_DBG("reading from cloud[chunkid=%d][start=%ld][chunk_off=%ld][real_read_size=%ld]", id, offset, chunk_off,real_read_size);
off_t direct_read_size = std::min(chunk_size, file_size - id * chunk_size);
DirectReadParam* direct_read_param = new DirectReadParam;
direct_read_param->len = direct_read_size;
direct_read_param->start = id * chunk_size;
direct_read_param->direct_reader = direct_reader_mgr;
direct_read_param->is_sync_download = true;
direct_read_worker(static_cast<void*>(direct_read_param));
if (direct_reader_mgr->chunks.count(id)) {
assert(chunk_off + static_cast<off_t>(real_read_size) <= direct_reader_mgr->chunks[id]->size);
memcpy(bytes, direct_reader_mgr->chunks[id]->buf + chunk_off, real_read_size);
} else {
return -EIO;
}
}
if (real_read_size < chunk_len) {
// finish reading the file.
return rsize + real_read_size;
}
offset += chunk_len;
readsize -= chunk_len;
bytes += chunk_len;
rsize += chunk_len;
}
if(max_prefetch_chunks != 0){
uint32_t prefetch_cnt = GetPrefetchCount(start, size);
GeneratePrefetchTask(chunkid_end, prefetch_cnt);
}
return rsize;
}