public CompletableFuture connect()

in sdk/greengrass/event-stream-rpc-client/src/main/java/software/amazon/awssdk/eventstreamrpc/EventStreamRPCConnection.java [88:227]


    public CompletableFuture<Void> connect(final LifecycleHandler lifecycleHandler) {
        synchronized (connectionState) {
            if (connectionState.connectionPhase == ConnectionState.Phase.DISCONNECTED) {
                connectionState.connectionPhase = ConnectionState.Phase.CONNECTING_SOCKET;
                connectionState.onConnectCalled = false;
            } else {
                throw new IllegalStateException("Connection is already established");
            }
        }
        final CompletableFuture<Void> initialConnectFuture = new CompletableFuture<>();

        ClientConnection.connect(config.getHost(), config.getPort(), config.getSocketOptions(),
                config.getTlsContext(), config.getClientBootstrap(), new ClientConnectionHandler() {
                    @Override
                    protected void onConnectionSetup(final ClientConnection clientConnection, int errorCode) {
                        LOGGER.info(String.format("Socket connection %s:%d to server result [%s]",
                                config.getHost(), config.getPort(), CRT.awsErrorName(errorCode)));
                        synchronized (connectionState) {
                            connectionState.connection = clientConnection;
                            if (CRT.AWS_CRT_SUCCESS != errorCode) {
                                connectionState.connectionPhase = ConnectionState.Phase.DISCONNECTED;
                                initialConnectFuture.completeExceptionally(new CrtRuntimeException(errorCode, CRT.awsErrorName(errorCode)));
                            } else if (connectionState.connectionPhase == ConnectionState.Phase.CLOSING) {
                                connectionState.closeReason = new EventStreamClosedException("Event stream closed by client");
                                disconnect();
                            } else {
                                connectionState.connectionPhase = ConnectionState.Phase.WAITING_CONNACK;
                                    config.getConnectMessageAmender().get()
                                        .whenComplete((messageAmendInfo, ex) -> {
                                            synchronized (connectionState) {
                                                if (clientConnection != connectionState.connection) {
                                                    LOGGER.warning("MessageAmender completed with different connection than initial");
                                                    return;
                                                }
                                                if (connectionState.connectionPhase == ConnectionState.Phase.CLOSING) {
                                                    connectionState.closeReason = new EventStreamClosedException("Event stream closed by client");
                                                } else {
                                                    try {
                                                        final List<Header> headers = new ArrayList<>(messageAmendInfo.getHeaders().size() + 1);
                                                        headers.add(Header.createHeader(EventStreamRPCServiceModel.VERSION_HEADER,
                                                                getVersionString()));
                                                        headers.addAll(messageAmendInfo.getHeaders().stream()
                                                                .filter(header -> !header.getName().equals(EventStreamRPCServiceModel.VERSION_HEADER))
                                                                .collect(Collectors.toList()));

                                                        LOGGER.fine("Waiting for connect ack message back from event stream RPC server");
                                                        clientConnection.sendProtocolMessage(headers,
                                                                messageAmendInfo.getPayload(), MessageType.Connect, 0);
                                                    } catch (Exception e) {
                                                        connectionState.connectionPhase = ConnectionState.Phase.CLOSING;
                                                        connectionState.closeReason = e;
                                                        disconnect();
                                                    }
                                                }
                                            }
                                        });
                            }
                        }
                    }

                    @Override
                    protected void onProtocolMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) {
                        if (MessageType.ConnectAck.equals(messageType)) {
                            synchronized (connectionState) {
                                if ((messageFlags & MessageFlags.ConnectionAccepted.getByteValue()) != 0) {
                                    connectionState.connectionPhase = ConnectionState.Phase.CONNECTED;
                                    //now the client is open for business to invoke operations
                                    LOGGER.info("Connection established with event stream RPC server");
                                    if (!initialConnectFuture.isDone()) {
                                        initialConnectFuture.complete(null);
                                    }
                                    connectionState.onConnectCalled = true;
                                    doOnConnect(lifecycleHandler);
                                } else {
                                    //This is access denied, implied due to not having ConnectionAccepted msg flag
                                    LOGGER.warning("AccessDenied to event stream RPC server");
                                    connectionState.connectionPhase = ConnectionState.Phase.CLOSING;
                                    connectionState.connection.closeConnection(0);

                                    final AccessDeniedException ade = new AccessDeniedException("Connection access denied to event stream RPC server");
                                    if (!initialConnectFuture.isDone()) {
                                        initialConnectFuture.completeExceptionally(ade);
                                    }
                                    doOnError(lifecycleHandler, ade);
                                }
                            }
                        } else if (MessageType.PingResponse.equals(messageType)) {
                            LOGGER.finer("Ping response received");
                        } else if (MessageType.Ping.equals(messageType)) {
                            sendPingResponse(Optional.of(new MessageAmendInfo(
                                    headers.stream().filter(header -> !header.getName().startsWith(":"))
                                    .collect(Collectors.toList()), payload)))
                                .whenComplete((res, ex) -> {
                                    LOGGER.finer("Ping response sent");
                                });
                        } else if (MessageType.Connect.equals(messageType)) {
                            LOGGER.severe("Erroneous connect message type received by client. Closing");
                            //TODO: client sends protocol error here?
                            disconnect();
                        } else if (MessageType.ProtocolError.equals(messageType) || MessageType.ServerError.equals(messageType)) {
                            LOGGER.severe("Received " + messageType.name() + ": " + CRT.awsErrorName(CRT.awsLastError()));
                            connectionState.closeReason = EventStreamError.create(headers, payload, messageType);
                            doOnError(lifecycleHandler, connectionState.closeReason);
                            disconnect();
                        } else {
                            LOGGER.severe("Unprocessed message type: " + messageType.name());
                            doOnError(lifecycleHandler, new EventStreamError("Unprocessed message type: " + messageType.name()));
                        }
                    }

                    @Override
                    protected void onConnectionClosed(int errorCode) {
                        LOGGER.finer("Socket connection closed: " + CRT.awsErrorName(errorCode));
                        Throwable closeReason;
                        boolean callOnDisconnect;
                        synchronized (connectionState) {
                            if (connectionState.connection != null) {
                                connectionState.connection.close();
                                connectionState.connection = null;
                            }
                            connectionState.connectionPhase = ConnectionState.Phase.DISCONNECTED;
                            closeReason = connectionState.closeReason;
                            connectionState.closeReason = null;
                            callOnDisconnect = connectionState.onConnectCalled;
                            connectionState.onConnectCalled = false;
                        }
                        if (!initialConnectFuture.isDone()) {
                            if (closeReason != null) {
                                initialConnectFuture.completeExceptionally(closeReason);
                            } else {
                                initialConnectFuture.complete(null);
                            }
                        }
                        if (callOnDisconnect) {
                            doOnDisconnect(lifecycleHandler, errorCode);
                        }
                    }
                });
        return initialConnectFuture;
    }