Status WindowsAsyncIOHandler::ScheduleOperation()

in src/environment/environment_windows.cc [886:947]


Status WindowsAsyncIOHandler::ScheduleOperation(OperationType operationType,
    void* buffer, size_t offset, uint32_t length,
    AsyncIOHandler::AsyncCallback callback, IAsyncContext* context) {

  unique_ptr_t<IOCallbackContext> io_context(
    reinterpret_cast<IOCallbackContext*>(Allocator::Get()->Allocate(
        sizeof(IOCallbackContext))),
  [](IOCallbackContext* c) {
    Status s = c->caller_context->DeepDelete();
    ALWAYS_ASSERT(s.ok());
    Allocator::Get()->Free(c);
  });

  if(!io_context.get()) return Status::OutOfMemory();
  new(io_context.get()) IOCallbackContext();

  IAsyncContext* caller_context_copy = nullptr;
  RETURN_NOT_OK(context->DeepCopy(&caller_context_copy));

  io_context->handler = this;
  io_context->callback = callback;
  io_context->caller_context = caller_context_copy;

  ::memset(&(io_context->overlapped), 0, sizeof(OVERLAPPED));
  io_context->overlapped.Offset = offset & 0xffffffffllu;
  io_context->overlapped.OffsetHigh = offset >> 32;

  ::StartThreadpoolIo(io_object_);

  BOOL success = FALSE;
  DWORD bytes_transferred = 0;
  if(OperationType::Read == operationType) {
    success = ::ReadFile(file_handle_, buffer, length, &bytes_transferred,
        &io_context->overlapped);
  } else {
    success = ::WriteFile(file_handle_, buffer, length, &bytes_transferred,
        &io_context->overlapped);
  }
  if(!success) {
    DWORD win32_result = ::GetLastError();
    // Any error other than ERROR_IO_PENDING means the IO failed. Otherwise it
     // will finish asynchronously on the threadpool
    if(ERROR_IO_PENDING != win32_result) {
      ::CancelThreadpoolIo(io_object_);
      std::stringstream ss;
      ss << "Failed to schedule async IO: " <<
          FormatWin32AndHRESULT(win32_result);
      LOG(ERROR) << ss.str();
      return Status::IOError(ss.str());
    }
  } else {
    // The IO finished syncrhonously. Even though the IO was threadpooled, NTFS
    // may finish the IO synchronously (especially on VMs). To honor the fully
    // async call contract, schedule the completion on a separate thread using
    // the threadpool.
    io_context->bytes_transferred = length;
    RETURN_NOT_OK(threadpool_->Schedule(io_priority_, FinishSyncIOAsyncTask,
        reinterpret_cast<void*>(io_context.get())));
  }
  io_context.release();
  return Status::OK();
}