in src/environment/environment_windows.cc [886:947]
Status WindowsAsyncIOHandler::ScheduleOperation(OperationType operationType,
void* buffer, size_t offset, uint32_t length,
AsyncIOHandler::AsyncCallback callback, IAsyncContext* context) {
unique_ptr_t<IOCallbackContext> io_context(
reinterpret_cast<IOCallbackContext*>(Allocator::Get()->Allocate(
sizeof(IOCallbackContext))),
[](IOCallbackContext* c) {
Status s = c->caller_context->DeepDelete();
ALWAYS_ASSERT(s.ok());
Allocator::Get()->Free(c);
});
if(!io_context.get()) return Status::OutOfMemory();
new(io_context.get()) IOCallbackContext();
IAsyncContext* caller_context_copy = nullptr;
RETURN_NOT_OK(context->DeepCopy(&caller_context_copy));
io_context->handler = this;
io_context->callback = callback;
io_context->caller_context = caller_context_copy;
::memset(&(io_context->overlapped), 0, sizeof(OVERLAPPED));
io_context->overlapped.Offset = offset & 0xffffffffllu;
io_context->overlapped.OffsetHigh = offset >> 32;
::StartThreadpoolIo(io_object_);
BOOL success = FALSE;
DWORD bytes_transferred = 0;
if(OperationType::Read == operationType) {
success = ::ReadFile(file_handle_, buffer, length, &bytes_transferred,
&io_context->overlapped);
} else {
success = ::WriteFile(file_handle_, buffer, length, &bytes_transferred,
&io_context->overlapped);
}
if(!success) {
DWORD win32_result = ::GetLastError();
// Any error other than ERROR_IO_PENDING means the IO failed. Otherwise it
// will finish asynchronously on the threadpool
if(ERROR_IO_PENDING != win32_result) {
::CancelThreadpoolIo(io_object_);
std::stringstream ss;
ss << "Failed to schedule async IO: " <<
FormatWin32AndHRESULT(win32_result);
LOG(ERROR) << ss.str();
return Status::IOError(ss.str());
}
} else {
// The IO finished syncrhonously. Even though the IO was threadpooled, NTFS
// may finish the IO synchronously (especially on VMs). To honor the fully
// async call contract, schedule the completion on a separate thread using
// the threadpool.
io_context->bytes_transferred = length;
RETURN_NOT_OK(threadpool_->Schedule(io_priority_, FinishSyncIOAsyncTask,
reinterpret_cast<void*>(io_context.get())));
}
io_context.release();
return Status::OK();
}