in sdk/greengrass/event-stream-rpc-client/src/main/java/software/amazon/awssdk/eventstreamrpc/EventStreamRPCConnection.java [88:227]
public CompletableFuture<Void> connect(final LifecycleHandler lifecycleHandler) {
synchronized (connectionState) {
if (connectionState.connectionPhase == ConnectionState.Phase.DISCONNECTED) {
connectionState.connectionPhase = ConnectionState.Phase.CONNECTING_SOCKET;
connectionState.onConnectCalled = false;
} else {
throw new IllegalStateException("Connection is already established");
}
}
final CompletableFuture<Void> initialConnectFuture = new CompletableFuture<>();
ClientConnection.connect(config.getHost(), config.getPort(), config.getSocketOptions(),
config.getTlsContext(), config.getClientBootstrap(), new ClientConnectionHandler() {
@Override
protected void onConnectionSetup(final ClientConnection clientConnection, int errorCode) {
LOGGER.info(String.format("Socket connection %s:%d to server result [%s]",
config.getHost(), config.getPort(), CRT.awsErrorName(errorCode)));
synchronized (connectionState) {
connectionState.connection = clientConnection;
if (CRT.AWS_CRT_SUCCESS != errorCode) {
connectionState.connectionPhase = ConnectionState.Phase.DISCONNECTED;
initialConnectFuture.completeExceptionally(new CrtRuntimeException(errorCode, CRT.awsErrorName(errorCode)));
} else if (connectionState.connectionPhase == ConnectionState.Phase.CLOSING) {
connectionState.closeReason = new EventStreamClosedException("Event stream closed by client");
disconnect();
} else {
connectionState.connectionPhase = ConnectionState.Phase.WAITING_CONNACK;
config.getConnectMessageAmender().get()
.whenComplete((messageAmendInfo, ex) -> {
synchronized (connectionState) {
if (clientConnection != connectionState.connection) {
LOGGER.warning("MessageAmender completed with different connection than initial");
return;
}
if (connectionState.connectionPhase == ConnectionState.Phase.CLOSING) {
connectionState.closeReason = new EventStreamClosedException("Event stream closed by client");
} else {
try {
final List<Header> headers = new ArrayList<>(messageAmendInfo.getHeaders().size() + 1);
headers.add(Header.createHeader(EventStreamRPCServiceModel.VERSION_HEADER,
getVersionString()));
headers.addAll(messageAmendInfo.getHeaders().stream()
.filter(header -> !header.getName().equals(EventStreamRPCServiceModel.VERSION_HEADER))
.collect(Collectors.toList()));
LOGGER.fine("Waiting for connect ack message back from event stream RPC server");
clientConnection.sendProtocolMessage(headers,
messageAmendInfo.getPayload(), MessageType.Connect, 0);
} catch (Exception e) {
connectionState.connectionPhase = ConnectionState.Phase.CLOSING;
connectionState.closeReason = e;
disconnect();
}
}
}
});
}
}
}
@Override
protected void onProtocolMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) {
if (MessageType.ConnectAck.equals(messageType)) {
synchronized (connectionState) {
if ((messageFlags & MessageFlags.ConnectionAccepted.getByteValue()) != 0) {
connectionState.connectionPhase = ConnectionState.Phase.CONNECTED;
//now the client is open for business to invoke operations
LOGGER.info("Connection established with event stream RPC server");
if (!initialConnectFuture.isDone()) {
initialConnectFuture.complete(null);
}
connectionState.onConnectCalled = true;
doOnConnect(lifecycleHandler);
} else {
//This is access denied, implied due to not having ConnectionAccepted msg flag
LOGGER.warning("AccessDenied to event stream RPC server");
connectionState.connectionPhase = ConnectionState.Phase.CLOSING;
connectionState.connection.closeConnection(0);
final AccessDeniedException ade = new AccessDeniedException("Connection access denied to event stream RPC server");
if (!initialConnectFuture.isDone()) {
initialConnectFuture.completeExceptionally(ade);
}
doOnError(lifecycleHandler, ade);
}
}
} else if (MessageType.PingResponse.equals(messageType)) {
LOGGER.finer("Ping response received");
} else if (MessageType.Ping.equals(messageType)) {
sendPingResponse(Optional.of(new MessageAmendInfo(
headers.stream().filter(header -> !header.getName().startsWith(":"))
.collect(Collectors.toList()), payload)))
.whenComplete((res, ex) -> {
LOGGER.finer("Ping response sent");
});
} else if (MessageType.Connect.equals(messageType)) {
LOGGER.severe("Erroneous connect message type received by client. Closing");
//TODO: client sends protocol error here?
disconnect();
} else if (MessageType.ProtocolError.equals(messageType) || MessageType.ServerError.equals(messageType)) {
LOGGER.severe("Received " + messageType.name() + ": " + CRT.awsErrorName(CRT.awsLastError()));
connectionState.closeReason = EventStreamError.create(headers, payload, messageType);
doOnError(lifecycleHandler, connectionState.closeReason);
disconnect();
} else {
LOGGER.severe("Unprocessed message type: " + messageType.name());
doOnError(lifecycleHandler, new EventStreamError("Unprocessed message type: " + messageType.name()));
}
}
@Override
protected void onConnectionClosed(int errorCode) {
LOGGER.finer("Socket connection closed: " + CRT.awsErrorName(errorCode));
Throwable closeReason;
boolean callOnDisconnect;
synchronized (connectionState) {
if (connectionState.connection != null) {
connectionState.connection.close();
connectionState.connection = null;
}
connectionState.connectionPhase = ConnectionState.Phase.DISCONNECTED;
closeReason = connectionState.closeReason;
connectionState.closeReason = null;
callOnDisconnect = connectionState.onConnectCalled;
connectionState.onConnectCalled = false;
}
if (!initialConnectFuture.isDone()) {
if (closeReason != null) {
initialConnectFuture.completeExceptionally(closeReason);
} else {
initialConnectFuture.complete(null);
}
}
if (callOnDisconnect) {
doOnDisconnect(lifecycleHandler, errorCode);
}
}
});
return initialConnectFuture;
}