source/mesh_stream/AsyncFile.h (192 lines of code) (raw):

/** * Copyright 2004-present Facebook. All Rights Reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #ifdef WIN32 #define METHOD 0 // ReadFileScatter #else #define METHOD 1 // async pread // #define METHOD 2 // sync pread #endif #if METHOD == 0 #ifndef NOMINMAX #define NOMINMAX #endif #include <windows.h> #else #include <fcntl.h> #include <sys/uio.h> #include <unistd.h> #include <future> #include <mutex> #endif #include <array> #include <chrono> #include <fstream> #include <map> #include <mutex> #include <string> #include <vector> #ifndef GLOG_NO_ABBREVIATED_SEVERITIES #define GLOG_NO_ABBREVIATED_SEVERITIES #endif #include <glog/logging.h> namespace fb360_dep { const uint64_t kPageSize = 4096; uint64_t align(uint64_t offset, uint64_t alignment) { return (offset + alignment - 1) & ~(alignment - 1); } uint8_t* align(uint8_t* p, uint64_t alignment) { return reinterpret_cast<uint8_t*>(align(reinterpret_cast<uint64_t>(p), alignment)); } struct AsyncFile { #if METHOD == 0 HANDLE handle; struct Segment { // mimic posix iovec struct https://linux.die.net/man/2/readv uint8_t* iov_base; size_t iov_len; }; struct PendingRead { HANDLE handle; OVERLAPPED overlapped; }; AsyncFile(const std::string& filename) { handle = CreateFileA( filename.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING, nullptr); CHECK_NE(handle, INVALID_HANDLE_VALUE) << "error opening " << filename; activityLog().addFile(handle, filename); } void close() const { CloseHandle(handle); } void readBegin(PendingRead& pending, const std::vector<Segment>& segments, uint64_t offset) const { pending.handle = handle; pending.overlapped.Offset = offset & 0xFFFFFFFF; pending.overlapped.OffsetHigh = offset >> 32; pending.overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); pending.overlapped.Internal = 0; pending.overlapped.InternalHigh = 0; // compute the total number of bytes uint64_t total = 0; for (const auto& segment : segments) { uint8_t* dst = segment.iov_base; uint64_t size = segment.iov_len; CHECK(align(dst, kPageSize) == dst); CHECK(align(size, kPageSize) == size); total += segment.iov_len; } // create a FILE_SEGMENT_ELEMENT per page plus an extra for NULL std::vector<FILE_SEGMENT_ELEMENT> pages(total / kPageSize + 1); size_t pageIndex = 0; for (const auto& segment : segments) { uint8_t* dst = segment.iov_base; uint64_t size = segment.iov_len; for (uint64_t i = 0; i < size / kPageSize; ++i) { pages[pageIndex].Buffer = dst; ++pageIndex; dst += kPageSize; } } CHECK_EQ(pageIndex, pages.size() - 1); pages.back().Buffer = nullptr; // kick off the read const BOOL ok = ReadFileScatter( pending.handle, pages.data(), static_cast<DWORD>(total), nullptr, &pending.overlapped); if (!ok) { CHECK_EQ(GetLastError(), ERROR_IO_PENDING); } activityLog().event(handle, offset, 0); } static uint64_t readEnd(PendingRead& pending) { uint64_t offset = pending.overlapped.OffsetHigh; offset = offset << 32 | pending.overlapped.Offset; activityLog().event(pending.handle, offset, 1); DWORD transferred; const BOOL ok = GetOverlappedResult(pending.handle, &pending.overlapped, &transferred, TRUE); CHECK(ok) << "file read error = " << GetLastError(); CloseHandle(pending.overlapped.hEvent); activityLog().event(pending.handle, offset, 2); return transferred; } #else // METHOD == 0 using HANDLE = int; HANDLE handle; using Segment = iovec; using PendingRead = std::future<ssize_t>; AsyncFile(const std::string& filename) { handle = open(filename.c_str(), O_RDONLY); CHECK_NE(handle, -1) << "error opening " << filename; } void close() const { ::close(handle); } ssize_t readSegments(const std::vector<Segment>& segments, uint64_t offset) const { ssize_t total = 0; for (const Segment& segment : segments) { ssize_t result = pread(handle, segment.iov_base, segment.iov_len, offset + total); CHECK_NE(result, -1) << "file read error " << errno; total += result; } return total; } void readBegin(PendingRead& pending, const std::vector<Segment>& segments, uint64_t offset) const { #if defined(__APPLE__) || defined(__linux__) // apple doesn't support preadv and linux crashes pending = std::async(&AsyncFile::readSegments, this, segments, offset); #else // __APPLE__ pending = std::async(preadv, handle, segments.data(), segments.size(), offset); #endif } static uint64_t readEnd(PendingRead& pending) { ssize_t result; result = pending.get(); CHECK_NE(result, -1) << "file read error = " << errno; return result; } #endif // METHOD == 0 struct ActivityLog { ~ActivityLog() { dump("activitylog.tsv"); } // keep a decoder ring to map filehandles to filenames std::map<HANDLE, std::string> filenames; // a request is identified by filehandle, offset using Request = std::pair<HANDLE, uint64_t>; // the events associated with a request are just 3 timestamps static const size_t kEventCount = 3; // begin, get, done using Clock = std::chrono::high_resolution_clock; using Events = std::array<Clock::time_point, kEventCount>; // live requests are stored in a map, completed requests in a vector std::map<Request, Events> live; std::vector<std::pair<Request, Events>> completed; // protect updates with a mutex std::mutex mutex; // call to populate filehandle -> filename decoder ring void addFile(HANDLE filehandle, const std::string& filename) { std::lock_guard<std::mutex> guard(mutex); filenames[filehandle] = filename; } // call when event occurs void event(HANDLE filehandle, uint64_t offset, size_t index) { CHECK_LT(index, kEventCount); Request request(filehandle, offset); Clock::time_point now = Clock::now(); std::lock_guard<std::mutex> guard(mutex); live[request][index] = now; if (index + 1 == kEventCount) { // last event, transfer from live to log completed.emplace_back(request, live[request]); live.erase(request); } } void dump(const std::string& filename) { std::ofstream file(filename); for (const auto& entry : completed) { file << "\"" << filenames[entry.first.first] << "\""; file << "\t" << entry.first.second; for (const auto& timestamp : entry.second) { // offset timestamps from first timestamp static Clock::time_point offset = timestamp; std::chrono::duration<double> elapsed = timestamp - offset; file << "\t" << elapsed.count(); } file << std::endl; } } }; // stick all asyncfile activity into the same activity log static AsyncFile::ActivityLog& activityLog() { static ActivityLog singleton; return singleton; } }; } // namespace fb360_dep