in sdk/greengrass/event-stream-rpc-server/src/main/java/software/amazon/awssdk/eventstreamrpc/ServiceOperationMappingContinuationHandler.java [71:142]
protected void onConnectRequest(List<Header> headers, byte[] payload) {
final int[] responseMessageFlag = { 0 };
final MessageType acceptResponseType = MessageType.ConnectAck;
final AuthenticationHandler authentication = serviceHandler.getAuthenticationHandler();
final AuthorizationHandler authorization = serviceHandler.getAuthorizationHandler();
try {
final Optional<String> versionHeader = headers.stream()
.filter(header -> header.getHeaderType() == HeaderType.String
&& header.getName().equals(EventStreamRPCServiceModel.VERSION_HEADER))
.map(header -> header.getValueAsString())
.findFirst();
if (versionHeader.isPresent() &&
Version.fromString(versionHeader.get()).equals(Version.getInstance())) {
//version matches
if (authentication == null) {
throw new IllegalStateException(
String.format("%s has null authentication handler!", serviceHandler.getServiceName()));
}
if (authorization == null) {
throw new IllegalStateException(
String.format("%s has null authorization handler!", serviceHandler.getServiceName()));
}
LOGGER.trace(String.format("%s running authentication handler", serviceHandler.getServiceName()));
authenticationData = authentication.apply(headers, payload);
if (authenticationData == null) {
throw new IllegalStateException(String.format("%s authentication handler returned null", serviceHandler.getServiceName()));
}
LOGGER.info(String.format("%s authenticated identity: %s", serviceHandler.getServiceName(), authenticationData.getIdentityLabel()));
final Authorization authorizationDecision = authorization.apply(authenticationData);
switch (authorizationDecision) {
case ACCEPT:
LOGGER.info("Connection accepted for " + authenticationData.getIdentityLabel());
responseMessageFlag[0] = MessageFlags.ConnectionAccepted.getByteValue();
break;
case REJECT:
LOGGER.info("Connection rejected for: " + authenticationData.getIdentityLabel());
break;
default:
//got a big problem if this is the outcome. Someone forgot to update this switch-case
throw new RuntimeException("Unknown authorization decision for " + authenticationData.getIdentityLabel());
}
} else { //version mismatch
LOGGER.warn(String.format("Client version {%s} mismatches server version {%s}",
versionHeader.isPresent() ? versionHeader.get() : "null",
Version.getInstance().getVersionString()));
}
} catch (Exception e) {
LOGGER.error(String.format("%s occurred while attempting to authN/authZ connect: %s", e.getClass(), e.getMessage()), e);
} finally {
final String authLabel = authenticationData != null ? authenticationData.getIdentityLabel() : "null";
LOGGER.info("Sending connect response for " + authLabel);
connection.sendProtocolMessage(null, null, acceptResponseType, responseMessageFlag[0])
.whenComplete((res, ex) -> {
//TODO: removing log statements due to known issue of locking up
if (ex != null) {
//LOGGER.severe(String.format("Sending connection response for %s threw exception (%s): %s",
// authLabel, ex.getClass().getCanonicalName(), ex.getMessage()));
}
else {
//LOGGER.info("Successfully sent connection response for: " + authLabel);
}
if (responseMessageFlag[0] != MessageFlags.ConnectionAccepted.getByteValue()) {
//LOGGER.info("Closing connection due to connection not being accepted...");
connection.closeConnection(0);
}
});
}
}