in win32/src/async_socket_win32.c [652:819]
int async_socket_receive_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_BUFFER* payload, uint32_t buffer_count, ON_ASYNC_SOCKET_RECEIVE_COMPLETE on_receive_complete, void* on_receive_complete_context)
{
int result;
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_076: [ on_receive_complete_context shall be allowed to be NULL. ]*/
if (
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_073: [ If async_socket is NULL, async_socket_receive_async shall fail and return a non-zero value. ]*/
(async_socket == NULL) ||
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_074: [ If buffers is NULL, async_socket_receive_async shall fail and return a non-zero value. ]*/
(payload == NULL) ||
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_086: [ If buffer_count is 0, async_socket_receive_async shall fail and return a non-zero value. ]*/
(buffer_count == 0) ||
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_075: [ If on_receive_complete is NULL, async_socket_receive_async shall fail and return a non-zero value. ]*/
(on_receive_complete == NULL)
)
{
LogError("Invalid arguments: ASYNC_SOCKET_HANDLE async_socket=%p, const ASYNC_SOCKET_BUFFER* payload=%p, uint32_t buffer_count=%" PRIu32 ", ON_ASYNC_SOCKET_RECEIVE_COMPLETE on_receive_complete=%p, void*, on_receive_complete_context=%p",
async_socket, payload, buffer_count, on_receive_complete, on_receive_complete_context);
result = MU_FAILURE;
}
else
{
uint32_t i;
uint32_t total_buffer_bytes = 0;
for (i = 0; i < buffer_count; i++)
{
if (
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_091: [ If any of the buffers in payload has buffer set to NULL, async_socket_receive_async shall fail and return a non-zero value. ]*/
(payload[i].buffer == NULL) ||
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_092: [ If any of the buffers in payload has length set to 0, async_socket_receive_async shall fail and return a non-zero value. ]*/
(payload[i].length == 0)
)
{
LogError("Invalid buffer %" PRIu32 ": buffer=%p, length = %" PRIu32, i, payload[i].buffer, payload[i].length);
break;
}
if (total_buffer_bytes + payload[i].length < total_buffer_bytes)
{
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_096: [ If the sum of buffer lengths for all the buffers in payload is greater than UINT32_MAX, async_socket_receive_async shall fail and return a non-zero value. ]*/
LogError("Overflow in total buffer length computation total_buffer_bytes=%" PRIu32 " + payload[i=%" PRIu32 "].length=%" PRIu32 "", total_buffer_bytes, i, payload[i].length);
break;
}
else
{
total_buffer_bytes += payload[i].length;
}
}
if (i < buffer_count)
{
LogError("Invalid buffers passed to async_socket_receive_async");
result = MU_FAILURE;
}
else
{
(void)InterlockedIncrement(&async_socket->pending_api_calls);
ASYNC_SOCKET_WIN32_STATE current_state;
if ((current_state = InterlockedAdd(&async_socket->state, 0)) != (LONG)ASYNC_SOCKET_WIN32_STATE_OPEN)
{
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_098: [ If async_socket is not OPEN, async_socket_receive_async shall fail and return a non-zero value. ]*/
LogWarning("Not open, current state is %" PRI_MU_ENUM "", MU_ENUM_VALUE(ASYNC_SOCKET_WIN32_STATE, current_state));
result = MU_FAILURE;
}
else
{
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_077: [ Otherwise async_socket_receive_async shall create a context for the send where the payload, on_receive_complete and on_receive_complete_context shall be stored. ]*/
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_078: [ The context shall also allocate enough memory to keep an array of buffer_count WSABUF items. ]*/
ASYNC_SOCKET_IO_CONTEXT* receive_context = malloc_flex(sizeof(ASYNC_SOCKET_IO_CONTEXT), buffer_count, sizeof(WSABUF));
if (receive_context == NULL)
{
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_084: [ If any error occurs, async_socket_receive_async shall fail and return a non-zero value. ]*/
LogError("failure in malloc_flex(sizeof(ASYNC_SOCKET_IO_CONTEXT)=%zu, buffer_count=%" PRIu32 ", sizeof(WSABUF)=%zu",
sizeof(ASYNC_SOCKET_IO_CONTEXT), buffer_count, sizeof(WSABUF));
result = MU_FAILURE;
}
else
{
receive_context->total_buffer_bytes = total_buffer_bytes;
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_079: [ async_socket_receive_async shall set the WSABUF items to point to the memory/length of the buffers in payload. ]*/
for (i = 0; i < buffer_count; i++)
{
receive_context->wsa_buffers[i].buf = payload[i].buffer;
receive_context->wsa_buffers[i].len = payload[i].length;
}
(void)memset(&receive_context->overlapped, 0, sizeof(receive_context->overlapped));
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_080: [ An event to be used for the OVERLAPPED structure passed to WSARecv shall be created and stored in the context. ]*/
receive_context->overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (receive_context->overlapped.hEvent == NULL)
{
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_084: [ If any error occurs, async_socket_receive_async shall fail and return a non-zero value. ]*/
LogLastError("CreateEvent failed");
result = MU_FAILURE;
}
else
{
int wsa_receive_result;
int wsa_last_error;
DWORD flags = 0;
receive_context->io_type = ASYNC_SOCKET_IO_TYPE_RECEIVE;
receive_context->io.receive.on_receive_complete = on_receive_complete;
receive_context->io.receive.on_receive_complete_context = on_receive_complete_context;
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_081: [ An asynchronous IO shall be started by calling StartThreadpoolIo. ]*/
StartThreadpoolIo(async_socket->tp_io);
#ifdef ENABLE_SOCKET_LOGGING
LogVerbose("Starting receive at %lf", timer_global_get_elapsed_us());
#endif
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_082: [ A receive shall be started for the WSABUF array associated with the context calling WSARecv and passing to it the OVERLAPPED structure with the event that was just created, dwFlags set to 0, lpNumberOfBytesSent set to NULL and lpCompletionRoutine set to NULL. ]*/
wsa_receive_result = WSARecv((SOCKET)async_socket->socket_handle, receive_context->wsa_buffers, buffer_count, NULL, &flags, &receive_context->overlapped, NULL);
if ((wsa_receive_result != 0) && (wsa_receive_result != SOCKET_ERROR))
{
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_105: [ If WSARecv fails with any other error, async_socket_receive_async shall call CancelThreadpoolIo and return a non-zero value. ]*/
LogLastError("WSARecv failed with %d", wsa_receive_result);
CancelThreadpoolIo(async_socket->tp_io);
result = MU_FAILURE;
}
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_054: [ If WSARecv fails with SOCKET_ERROR, async_socket_receive_async shall call WSAGetLastError. ]*/
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_055: [ If WSAGetLastError returns IO_PENDING, it shall be not treated as an error. ]*/
else if ((wsa_receive_result == SOCKET_ERROR) && ((wsa_last_error = WSAGetLastError()) != WSA_IO_PENDING))
{
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_084: [ If any error occurs, async_socket_receive_async shall fail and return a non-zero value. ]*/
LogLastError("WSARecv failed with %d, WSAGetLastError returned %lu", wsa_receive_result, wsa_last_error);
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_099: [ If WSAGetLastError returns any other error, async_socket_receive_async shall call CancelThreadpoolIo. ]*/
CancelThreadpoolIo(async_socket->tp_io);
result = MU_FAILURE;
}
else
{
(void)InterlockedDecrement(&async_socket->pending_api_calls);
WakeByAddressSingle((PVOID)&async_socket->pending_api_calls);
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_083: [ On success, async_socket_receive_async shall return 0. ]*/
result = 0;
goto all_ok;
}
if (!CloseHandle(receive_context->overlapped.hEvent))
{
LogLastError("CloseHandle failed");
}
}
free(receive_context);
}
}
(void)InterlockedDecrement(&async_socket->pending_api_calls);
WakeByAddressSingle((PVOID)&async_socket->pending_api_calls);
}
}
all_ok:
return result;
}