int aws_socket_connect()

in source/posix/socket.c [569:747]


int aws_socket_connect(
    struct aws_socket *socket,
    const struct aws_socket_endpoint *remote_endpoint,
    struct aws_event_loop *event_loop,
    aws_socket_on_connection_result_fn *on_connection_result,
    void *user_data) {
    AWS_ASSERT(event_loop);
    AWS_ASSERT(!socket->event_loop);

    AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd);

    if (socket->event_loop) {
        return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
    }

    if (socket->options.type != AWS_SOCKET_DGRAM) {
        AWS_ASSERT(on_connection_result);
        if (socket->state != INIT) {
            return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
        }
    } else { /* UDP socket */
        /* UDP sockets jump to CONNECT_READ if bind is called first */
        if (socket->state != CONNECTED_READ && socket->state != INIT) {
            return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
        }
    }

    size_t address_strlen;
    if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
        return AWS_OP_ERR;
    }

    struct socket_address address;
    AWS_ZERO_STRUCT(address);
    socklen_t sock_size = 0;
    int pton_err = 1;
    if (socket->options.domain == AWS_SOCKET_IPV4) {
        pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
        address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port);
        address.sock_addr_types.addr_in.sin_family = AF_INET;
        sock_size = sizeof(address.sock_addr_types.addr_in);
    } else if (socket->options.domain == AWS_SOCKET_IPV6) {
        pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
        address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port);
        address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
        sock_size = sizeof(address.sock_addr_types.addr_in6);
    } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
        address.sock_addr_types.un_addr.sun_family = AF_UNIX;
        strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN);
        sock_size = sizeof(address.sock_addr_types.un_addr);
#ifdef USE_VSOCK
    } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
        pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
        address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
        address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port;
        sock_size = sizeof(address.sock_addr_types.vm_addr);
#endif
    } else {
        AWS_ASSERT(0);
        return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
    }

    if (pton_err != 1) {
        AWS_LOGF_ERROR(
            AWS_LS_IO_SOCKET,
            "id=%p fd=%d: failed to parse address %s:%d.",
            (void *)socket,
            socket->io_handle.data.fd,
            remote_endpoint->address,
            (int)remote_endpoint->port);
        return aws_raise_error(s_convert_pton_error(pton_err));
    }

    AWS_LOGF_DEBUG(
        AWS_LS_IO_SOCKET,
        "id=%p fd=%d: connecting to endpoint %s:%d.",
        (void *)socket,
        socket->io_handle.data.fd,
        remote_endpoint->address,
        (int)remote_endpoint->port);

    socket->state = CONNECTING;
    socket->remote_endpoint = *remote_endpoint;
    socket->connect_accept_user_data = user_data;
    socket->connection_result_fn = on_connection_result;

    struct posix_socket *socket_impl = socket->impl;

    socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args));
    if (!socket_impl->connect_args) {
        return AWS_OP_ERR;
    }

    socket_impl->connect_args->socket = socket;
    socket_impl->connect_args->allocator = socket->allocator;

    socket_impl->connect_args->task.fn = s_handle_socket_timeout;
    socket_impl->connect_args->task.arg = socket_impl->connect_args;

    int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size);
    socket->event_loop = event_loop;

    if (!error_code) {
        AWS_LOGF_INFO(
            AWS_LS_IO_SOCKET,
            "id=%p fd=%d: connected immediately, not scheduling timeout.",
            (void *)socket,
            socket->io_handle.data.fd);
        socket_impl->connect_args->task.fn = s_run_connect_success;
        /* the subscription for IO will happen once we setup the connection in the task. Since we already
         * know the connection succeeded, we don't need to register for events yet. */
        aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task);
    }

    if (error_code) {
        error_code = errno;
        if (error_code == EINPROGRESS || error_code == EALREADY) {
            AWS_LOGF_TRACE(
                AWS_LS_IO_SOCKET,
                "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.",
                (void *)socket,
                socket->io_handle.data.fd);
            /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately
             * and null out the connect args */
            struct aws_task *timeout_task = &socket_impl->connect_args->task;

            socket_impl->currently_subscribed = true;
            /* This event is for when the connection finishes. (the fd will flip writable). */
            if (aws_event_loop_subscribe_to_io_events(
                    event_loop,
                    &socket->io_handle,
                    AWS_IO_EVENT_TYPE_WRITABLE,
                    s_socket_connect_event,
                    socket_impl->connect_args)) {
                AWS_LOGF_ERROR(
                    AWS_LS_IO_SOCKET,
                    "id=%p fd=%d: failed to register with event-loop %p.",
                    (void *)socket,
                    socket->io_handle.data.fd,
                    (void *)event_loop);
                socket_impl->currently_subscribed = false;
                socket->event_loop = NULL;
                goto err_clean_up;
            }

            /* schedule a task to run at the connect timeout interval, if this task runs before the connect
             * happens, we consider that a timeout. */
            uint64_t timeout = 0;
            aws_event_loop_current_clock_time(event_loop, &timeout);
            timeout += aws_timestamp_convert(
                socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
            AWS_LOGF_TRACE(
                AWS_LS_IO_SOCKET,
                "id=%p fd=%d: scheduling timeout task for %llu.",
                (void *)socket,
                socket->io_handle.data.fd,
                (unsigned long long)timeout);
            aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout);
        } else {
            AWS_LOGF_ERROR(
                AWS_LS_IO_SOCKET,
                "id=%p fd=%d: connect failed with error code %d.",
                (void *)socket,
                socket->io_handle.data.fd,
                error_code);
            int aws_error = s_determine_socket_error(error_code);
            aws_raise_error(aws_error);
            socket->event_loop = NULL;
            socket_impl->currently_subscribed = false;
            goto err_clean_up;
        }
    }
    return AWS_OP_SUCCESS;

err_clean_up:
    aws_mem_release(socket->allocator, socket_impl->connect_args);
    socket_impl->connect_args = NULL;
    return AWS_OP_ERR;
}