include/ylt/coro_io/coro_file.hpp (556 lines of code) (raw):

/* * Copyright (c) 2023, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <async_simple/Promise.h> #include <async_simple/Traits.h> #include <async_simple/coro/FutureAwaiter.h> #include <cstdint> #include <cstdio> #include <filesystem> #include <fstream> #include <span> #include "async_simple/coro/SyncAwait.h" #include "io_context_pool.hpp" #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) #include <asio/random_access_file.hpp> #include <asio/stream_file.hpp> #endif #include <async_simple/coro/Lazy.h> #include <fcntl.h> #include <asio/error.hpp> #include <cstddef> #include <exception> #include <iostream> #include <memory> #include <string> #include <string_view> #include <system_error> #include <utility> #include <vector> #include "coro_io.hpp" #if defined(ASIO_WINDOWS) #include <io.h> #endif namespace coro_io { /* ┌─────────────┬───────────────────────────────┐ │fopen() mode │ open() flags │ ├─────────────┼───────────────────────────────┤ │ r │ O_RDONLY │ ├─────────────┼───────────────────────────────┤ │ w │ O_WRONLY | O_CREAT | O_TRUNC │ ├─────────────┼───────────────────────────────┤ │ a │ O_WRONLY | O_CREAT | O_APPEND │ ├─────────────┼───────────────────────────────┤ │ r+ │ O_RDWR │ ├─────────────┼───────────────────────────────┤ │ w+ │ O_RDWR | O_CREAT | O_TRUNC │ ├─────────────┼───────────────────────────────┤ │ a+ │ O_RDWR | O_CREAT | O_APPEND │ └─────────────┴───────────────────────────────┘ */ enum flags { #if defined(ASIO_WINDOWS) read_only = 1, write_only = 2, read_write = 4, append = 8, create = 16, exclusive = 32, truncate = 64, create_write = create | write_only, create_write_trunc = create | write_only | truncate, create_read_write_trunc = read_write | create | truncate, create_read_write_append = read_write | create | append, sync_all_on_write = 128 #else // defined(ASIO_WINDOWS) read_only = O_RDONLY, write_only = O_WRONLY, read_write = O_RDWR, append = O_APPEND, create = O_CREAT, exclusive = O_EXCL, truncate = O_TRUNC, create_write = O_CREAT | O_WRONLY, create_write_trunc = O_WRONLY | O_CREAT | O_TRUNC, create_read_write_trunc = O_RDWR | O_CREAT | O_TRUNC, create_read_write_append = O_RDWR | O_CREAT | O_APPEND, sync_all_on_write = O_SYNC #endif // defined(ASIO_WINDOWS) }; constexpr inline flags to_flags(std::ios::ios_base::openmode mode) { flags access = flags::read_write; if (mode == std::ios::in) access = flags::read_only; else if (mode == std::ios::out) access = flags::write_only; else if (mode == std::ios::app) access = flags::append; else if (mode == std::ios::trunc) access = flags::truncate; else if (mode == (std::ios::in | std::ios::out)) access = flags::read_write; else if (mode == (std::ios::trunc | std::ios::out)) access = flags::create_write_trunc; if (mode == (std::ios::in | std::ios::out | std::ios::trunc)) access = create_read_write_trunc; else if (mode == (std::ios::in | std::ios::out | std::ios::app)) access = create_read_write_append; return access; } #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) template <bool seq, typename File, typename Executor> inline bool open_native_async_file(File &file, Executor &executor, std::string_view filepath, flags open_flags) { if (file && file->is_open()) { return true; } try { if constexpr (seq) { file = std::make_shared<asio::stream_file>( executor.get_asio_executor(), std::string(filepath), static_cast<asio::file_base::flags>(open_flags)); } else { file = std::make_shared<asio::random_access_file>( executor.get_asio_executor(), std::string(filepath), static_cast<asio::file_base::flags>(open_flags)); } } catch (std::exception &ex) { ELOG_INFO << "line " << __LINE__ << " coro_file open failed" << ex.what() << "\n"; return false; } return true; } #endif enum class execution_type { none, native_async, thread_pool }; template <execution_type execute_type = execution_type::native_async> class basic_seq_coro_file { public: basic_seq_coro_file(coro_io::ExecutorWrapper<> *executor = coro_io::get_global_block_executor()) : basic_seq_coro_file(executor->get_asio_executor()) {} basic_seq_coro_file(asio::io_context::executor_type executor) : executor_wrapper_(executor) {} basic_seq_coro_file(std::string_view filepath, std::ios::ios_base::openmode open_flags, coro_io::ExecutorWrapper<> *executor = coro_io::get_global_block_executor()) : basic_seq_coro_file(filepath, open_flags, executor->get_asio_executor()) {} basic_seq_coro_file(std::string_view filepath, std::ios::ios_base::openmode open_flags, asio::io_context::executor_type executor) : executor_wrapper_(executor) { open(filepath, open_flags); } bool open(std::string_view filepath, std::ios::ios_base::openmode open_flags) { file_path_ = std::string{filepath}; if constexpr (execute_type == execution_type::thread_pool) { return open_stream_file_in_pool(filepath, open_flags); } else { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) return open_native_async_file<true>(async_seq_file_, executor_wrapper_, filepath, to_flags(open_flags)); #else return open_stream_file_in_pool(filepath, open_flags); #endif } } async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read( char *buf, size_t size) { if constexpr (execute_type == execution_type::thread_pool) { co_return co_await async_read_write({buf, size}); } else { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_seq_file_ == nullptr) { co_return std::make_pair( std::make_error_code(std::errc::invalid_argument), 0); } auto [ec, read_size] = co_await coro_io::async_read( *async_seq_file_, asio::buffer(buf, size)); if (ec == asio::error::eof) { eof_ = true; co_return std::make_pair(std::error_code{}, read_size); } co_return std::make_pair(ec, read_size); #else co_return co_await async_read_write({buf, size}); #endif } } template <bool is_read = true> async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_write( std::span<char> buf) { auto result = co_await coro_io::post( [this, buf]() -> std::pair<std::error_code, size_t> { if constexpr (is_read) { if (frw_seq_file_.read(buf.data(), buf.size())) { return std::make_pair(std::error_code{}, frw_seq_file_.gcount()); } } else { if (frw_seq_file_.write(buf.data(), buf.size())) { return std::make_pair(std::error_code{}, buf.size()); } } if (frw_seq_file_.eof()) { eof_ = true; return std::make_pair(std::error_code{}, frw_seq_file_.gcount()); } return std::make_pair(std::make_error_code(std::errc::io_error), 0); }, &executor_wrapper_); co_return result.value(); } async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_write( std::string_view buf) { if constexpr (execute_type == execution_type::thread_pool) { co_return co_await async_read_write<false>( std::span(const_cast<char *>(buf.data()), buf.size())); } else { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_seq_file_ == nullptr) { co_return std::make_pair( std::make_error_code(std::errc::invalid_argument), 0); } auto [ec, size] = co_await coro_io::async_write(*async_seq_file_, asio::buffer(buf)); co_return std::make_pair(ec, size); #else co_return co_await async_read_write<false>( std::span(const_cast<char *>(buf.data()), buf.size())); #endif } } #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) std::shared_ptr<asio::stream_file> get_async_stream_file() { return async_seq_file_; } #endif std::fstream &get_stream_file() { return frw_seq_file_; } bool is_open() { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_seq_file_ && async_seq_file_->is_open()) { return true; } #endif return frw_seq_file_.is_open(); } bool eof() { return eof_; } void close() { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_seq_file_ && async_seq_file_->is_open()) { std::error_code ec; async_seq_file_->close(ec); } #endif if (frw_seq_file_.is_open()) { frw_seq_file_.close(); } } bool seek(size_t offset, std::ios_base::seekdir dir) { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_seq_file_ && async_seq_file_->is_open()) { int whence = SEEK_SET; if (dir == std::ios_base::cur) whence = SEEK_CUR; else if (dir == std::ios_base::end) whence = SEEK_END; std::error_code seek_ec; async_seq_file_->seek( offset, static_cast<asio::file_base::seek_basis>(whence), seek_ec); if (seek_ec) { return false; } return true; } #endif if (frw_seq_file_.is_open()) { if (frw_seq_file_.seekg(offset, dir)) { return true; } } return false; } execution_type get_execution_type() { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_seq_file_ && async_seq_file_->is_open()) { return execution_type::native_async; } #endif if (frw_seq_file_.is_open()) { return execution_type::thread_pool; } return execution_type::none; } size_t file_size(std::error_code ec) const noexcept { return std::filesystem::file_size(file_path_, ec); } size_t file_size() const { return std::filesystem::file_size(file_path_); } std::string_view file_path() const { return file_path_; } private: bool open_stream_file_in_pool(std::string_view filepath, std::ios::ios_base::openmode flags) { if (frw_seq_file_.is_open()) { return true; } auto coro_func = coro_io::post( [this, flags, filepath]() mutable { frw_seq_file_.open(filepath.data(), flags); if (!frw_seq_file_.is_open()) { ELOG_INFO << "line " << __LINE__ << " coro_file open failed " << filepath << "\n"; std::cerr << "Error: " << strerror(errno); return false; } return true; }, &executor_wrapper_); auto result = async_simple::coro::syncAwait(coro_func); return result.value(); } coro_io::ExecutorWrapper<> executor_wrapper_; #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) std::shared_ptr<asio::stream_file> async_seq_file_; // seq #endif std::fstream frw_seq_file_; // fread/fwrite seq file std::string file_path_; bool eof_ = false; }; using coro_file = basic_seq_coro_file<>; template <execution_type execute_type = execution_type::native_async> class basic_random_coro_file { public: basic_random_coro_file(coro_io::ExecutorWrapper<> *executor = coro_io::get_global_block_executor()) : basic_random_coro_file(executor->get_asio_executor()) {} basic_random_coro_file(asio::io_context::executor_type executor) : executor_wrapper_(executor) {} basic_random_coro_file(std::string_view filepath, std::ios::ios_base::openmode open_flags, coro_io::ExecutorWrapper<> *executor = coro_io::get_global_block_executor()) : basic_random_coro_file(filepath, open_flags, executor->get_asio_executor()) {} basic_random_coro_file(std::string_view filepath, std::ios::ios_base::openmode open_flags, asio::io_context::executor_type executor) : executor_wrapper_(executor) { open(filepath, open_flags); } bool open(std::string_view filepath, std::ios::ios_base::openmode open_flags) { file_path_ = std::string{filepath}; if constexpr (execute_type == execution_type::thread_pool) { return open_fd(filepath, to_flags(open_flags)); } else { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) return open_native_async_file<false>(async_random_file_, executor_wrapper_, filepath, to_flags(open_flags)); #else return open_fd(filepath, to_flags(open_flags)); #endif } } async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_at( uint64_t offset, char *buf, size_t size) { if constexpr (execute_type == execution_type::thread_pool) { co_return co_await async_pread(offset, buf, size); } else { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_random_file_ == nullptr) { co_return std::make_pair( std::make_error_code(std::errc::invalid_argument), 0); } auto [ec, read_size] = co_await coro_io::async_read_at( offset, *async_random_file_, asio::buffer(buf, size)); if (ec == asio::error::eof) { eof_ = true; co_return std::make_pair(std::error_code{}, read_size); } co_return std::make_pair(ec, read_size); #else co_return co_await async_pread(offset, buf, size); #endif } } async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_write_at( uint64_t offset, std::string_view buf) { if constexpr (execute_type == execution_type::thread_pool) { co_return co_await async_pwrite(offset, buf.data(), buf.size()); } else { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_random_file_ == nullptr) { co_return std::make_pair( std::make_error_code(std::errc::invalid_argument), 0); } auto [ec, write_size] = co_await coro_io::async_write_at( offset, *async_random_file_, asio::buffer(buf)); co_return std::make_pair(ec, write_size); #else co_return co_await async_pwrite(offset, buf.data(), buf.size()); #endif } } #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) std::shared_ptr<asio::random_access_file> get_async_stream_file() { return async_random_file_; } #endif std::shared_ptr<int> get_pread_file() { return prw_random_file_; } bool is_open() { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_random_file_ && async_random_file_->is_open()) { return true; } #endif return prw_random_file_ != nullptr; } bool eof() { return eof_; } execution_type get_execution_type() { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) if (async_random_file_ && async_random_file_->is_open()) { return execution_type::native_async; } #endif if (prw_random_file_ != nullptr) { return execution_type::thread_pool; } return execution_type::none; } void close() { #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) std::error_code ec; if (async_random_file_) { async_random_file_->close(ec); } #endif prw_random_file_ = nullptr; } size_t file_size(std::error_code ec) const noexcept { return std::filesystem::file_size(file_path_, ec); } size_t file_size() const { return std::filesystem::file_size(file_path_); } std::string_view file_path() const { return file_path_; } private: bool open_fd(std::string_view filepath, int open_flags) { if (prw_random_file_) { return true; } #if defined(ASIO_WINDOWS) int fd = _open(filepath.data(), adjust_flags(open_flags)); #else int fd = ::open(filepath.data(), open_flags); #endif if (fd < 0) { return false; } prw_random_file_ = std::shared_ptr<int>(new int(fd), [](int *ptr) { #if defined(ASIO_WINDOWS) _close(*ptr); #else ::close(*ptr); #endif delete ptr; }); return true; } async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_pread( size_t offset, char *data, size_t size) { #if defined(ASIO_WINDOWS) auto pread = [](int fd, void *buf, uint64_t count, uint64_t offset) -> int64_t { DWORD bytes_read = 0; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(OVERLAPPED)); overlapped.Offset = offset & 0xFFFFFFFF; overlapped.OffsetHigh = (offset >> 32) & 0xFFFFFFFF; BOOL ok = ReadFile(reinterpret_cast<HANDLE>(_get_osfhandle(fd)), buf, count, &bytes_read, &overlapped); if (!ok && (errno = GetLastError()) != ERROR_HANDLE_EOF) { return -1; } return bytes_read; }; #endif co_return co_await async_prw(pread, true, offset, data, size); } async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_pwrite( size_t offset, const char *data, size_t size) { #if defined(ASIO_WINDOWS) auto pwrite = [](int fd, const void *buf, uint64_t count, uint64_t offset) -> int64_t { DWORD bytes_write = 0; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(OVERLAPPED)); overlapped.Offset = offset & 0xFFFFFFFF; overlapped.OffsetHigh = (offset >> 32) & 0xFFFFFFFF; BOOL ok = WriteFile(reinterpret_cast<HANDLE>(_get_osfhandle(fd)), buf, count, &bytes_write, &overlapped); if (!ok) { return -1; } return bytes_write; }; #endif co_return co_await async_prw(pwrite, false, offset, (char *)data, size); } async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_prw( auto io_func, bool is_read, size_t offset, char *buf, size_t size) { std::function<int()> func = [=, this] { int fd = *prw_random_file_; return io_func(fd, buf, size, offset); }; std::error_code ec{}; size_t op_size = 0; auto len_val = co_await coro_io::post(std::move(func), &executor_wrapper_); int len = len_val.value(); if (len == 0) { if (is_read) { eof_ = true; } } else if (len > 0) { op_size = len; } else { ec = std::make_error_code(std::errc::io_error); op_size = len; } co_return std::make_pair(ec, op_size); } #if defined(ASIO_WINDOWS) static int adjust_flags(int open_mode) { switch (open_mode) { case flags::read_only: return _O_RDONLY; case flags::write_only: return _O_WRONLY; case flags::read_write: return _O_RDWR; case flags::append: return _O_APPEND; case flags::create: return _O_CREAT; case flags::exclusive: return _O_EXCL; case flags::truncate: return _O_TRUNC; case flags::create_write: return _O_CREAT | _O_WRONLY; case flags::create_write_trunc: return _O_CREAT | _O_WRONLY | _O_TRUNC; case flags::create_read_write_trunc: return _O_RDWR | _O_CREAT | _O_TRUNC; case flags::create_read_write_append: return _O_RDWR | _O_CREAT | _O_APPEND; case flags::sync_all_on_write: default: return open_mode; break; } return open_mode; } #endif coro_io::ExecutorWrapper<> executor_wrapper_; #if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) std::shared_ptr<asio::random_access_file> async_random_file_; // random file #endif std::shared_ptr<int> prw_random_file_ = nullptr; // pread/pwrite random file std::string file_path_; bool eof_ = false; }; using random_coro_file = basic_random_coro_file<>; } // namespace coro_io