ssize_t PseudoFdInfo::DirectReadAndPrefetch()

in src/fdcache_fdinfo.cpp [320:409]


ssize_t PseudoFdInfo::DirectReadAndPrefetch(char* bytes, off_t start, size_t size)
{
    S3FS_PRN_DBG("direct read and prefetch[pseudo_fd=%d][physical_fd=%d][offset=%lld][size=%zu]", pseudo_fd, physical_fd, static_cast<long long int>(start), size);

    ssize_t rsize = 0;
    off_t offset = start;
    size_t readsize = size;
    
    const off_t chunk_size = DirectReader::GetChunkSize();
    const uint32_t max_prefetch_chunks = DirectReader::GetPrefetchChunkCount();
    off_t file_size = direct_reader_mgr->GetFileSize();
    if(size == 0 || start >= file_size){
        return 0;
    }
    
    uint32_t chunkid_start = offset / chunk_size;
    uint32_t chunkid_end = (offset + readsize - 1) / chunk_size;
    for (uint32_t id = chunkid_start; id <= chunkid_end; id++) {
        off_t chunk_off = offset % chunk_size;
        size_t chunk_len = std::min(readsize, static_cast<size_t>(chunk_size-chunk_off));

        // avoid reading the file out of filesize
        size_t real_read_size = 0;
        if (id * chunk_size + chunk_off >= file_size) {
            real_read_size = 0;
        } else if(id * chunk_size + chunk_off + static_cast<off_t>(chunk_len) > file_size){
            real_read_size = file_size - id * chunk_size - chunk_off;
        } else {
            real_read_size = chunk_len;
        }

        AutoLock auto_lock(&direct_reader_mgr->direct_read_lock);
        
        // Release chunks to reduce memory usage
        for (auto iter = direct_reader_mgr->chunks.begin(); iter!= direct_reader_mgr->chunks.end(); ) {
            // keep the one before the current chunk without releasing it. Because we assume that when 
            // the read offset is in the previous chunk, it is still read sequentially.
            // keep chunks in [id-backward_chunks, id+max_prefetch_chunks]
            uint32_t chunkid = iter->first;
            if(chunkid + DirectReader::GetBackwardChunks() >= id && chunkid <= id + max_prefetch_chunks){
                iter++;
            } else {
                S3FS_PRN_DBG("release chunk[pseudo_fd=%d][chunkid=%d]", pseudo_fd, chunkid);
                delete iter->second;
                iter->second = NULL;
                iter = direct_reader_mgr->chunks.erase(iter);
            }
        }
        
        if (direct_reader_mgr->chunks.count(id)) {
            S3FS_PRN_DBG("reading from buffer[chunkid=%d][offset=%ld][chunk_off=%ld][real_read_size=%ld]", id, offset, chunk_off, real_read_size);
            assert(chunk_off + static_cast<off_t>(real_read_size) <= direct_reader_mgr->chunks[id]->size);
            memcpy(bytes, direct_reader_mgr->chunks[id]->buf + chunk_off, real_read_size);
        } else {
            // if the chunk does not exist, we should download it from oss directly.
            S3FS_PRN_DBG("reading from cloud[chunkid=%d][start=%ld][chunk_off=%ld][real_read_size=%ld]", id, offset, chunk_off,real_read_size);
            off_t direct_read_size = std::min(chunk_size, file_size - id * chunk_size);
            DirectReadParam* direct_read_param = new DirectReadParam;
            direct_read_param->len = direct_read_size;
            direct_read_param->start = id * chunk_size;
            direct_read_param->direct_reader = direct_reader_mgr;
            direct_read_param->is_sync_download = true;
            direct_read_worker(static_cast<void*>(direct_read_param));
            
            if (direct_reader_mgr->chunks.count(id)) {
                assert(chunk_off + static_cast<off_t>(real_read_size) <= direct_reader_mgr->chunks[id]->size);
                memcpy(bytes, direct_reader_mgr->chunks[id]->buf + chunk_off, real_read_size);
            } else {
                return -EIO;
            }
        }

        if (real_read_size < chunk_len) { 
            // finish reading the file.
            return rsize + real_read_size;   
        }
        
        offset += chunk_len;
        readsize -= chunk_len;
        bytes += chunk_len;
        rsize += chunk_len;
    }

    if(max_prefetch_chunks != 0){
        uint32_t prefetch_cnt = GetPrefetchCount(start, size);
        GeneratePrefetchTask(chunkid_end, prefetch_cnt);
    }

    return rsize;
}