in gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java [81:197]
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (!(msg instanceof RequestMessage)) {
logger.warn("{} only processes RequestMessage instances - received {} - channel closing",
this.getClass().getSimpleName(), msg.getClass());
ctx.close();
return;
}
final RequestMessage requestMessage = (RequestMessage) msg;
final Attribute<Authenticator.SaslNegotiator> negotiator = ctx.channel().attr(StateKey.NEGOTIATOR);
final Attribute<RequestMessage> request = ctx.channel().attr(StateKey.REQUEST_MESSAGE);
final Attribute<Pair<LocalDateTime, List<RequestMessage>>> deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
if (negotiator.get() == null) {
try {
// First time through so save the request and send an AUTHENTICATE challenge with no data
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
request.set(requestMessage);
final ResponseMessage authenticate = ResponseMessage.build(requestMessage)
.code(ResponseStatusCode.AUTHENTICATE).create();
ctx.writeAndFlush(authenticate);
} catch (Exception ex) {
// newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems
// to hang until timeout which isn't so nice. treating this like a server error as it means that
// the Authenticator isn't really ready to deal with requests for some reason.
logger.error(String.format("%s is not ready to handle requests - check its configuration or related services",
authenticator.getClass().getSimpleName()), ex);
respondWithError(
requestMessage,
builder -> builder.statusMessage("Authenticator is not ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
ctx);
}
return;
} else if (!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
// If authentication negotiation is pending, store subsequent non-authentication requests for later processing
deferredRequests.setIfAbsent(new ImmutablePair<>(LocalDateTime.now(), new ArrayList<>()));
deferredRequests.get().getValue().add(requestMessage);
final Duration deferredDuration = Duration.between(deferredRequests.get().getKey(), LocalDateTime.now());
if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) > 0) {
respondWithError(
requestMessage,
builder -> builder.statusMessage("Authentication did not finish in the allowed duration (" + MAX_REQUEST_DEFERRABLE_DURATION + "s).")
.code(ResponseStatusCode.UNAUTHORIZED),
ctx);
return;
}
return;
} else if (!requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
// This is an authentication request that is missing a "sasl" argument.
respondWithError(
requestMessage,
builder -> builder.statusMessage("Failed to authenticate").code(ResponseStatusCode.UNAUTHORIZED),
ctx);
return;
}
final Object saslObject = requestMessage.getArgs().get(Tokens.ARGS_SASL);
if (!(saslObject instanceof String)) {
respondWithError(
requestMessage,
builder -> builder
.statusMessage("Incorrect type for : " + Tokens.ARGS_SASL + " - base64 encoded String is expected")
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST),
ctx);
return;
}
try {
final byte[] saslResponse = BASE64_DECODER.decode((String) saslObject);
final byte[] saslMessage = negotiator.get().evaluateResponse(saslResponse);
if (!negotiator.get().isComplete()) {
// not done here - send back the sasl message for next challenge.
final HashMap<String, Object> metadata = new HashMap<>();
metadata.put(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(saslMessage));
final ResponseMessage authenticate = ResponseMessage.build(requestMessage)
.statusAttributes(metadata)
.code(ResponseStatusCode.AUTHENTICATE).create();
ctx.writeAndFlush(authenticate);
return;
}
final org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser user = negotiator.get().getAuthenticatedUser();
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
// User name logged with the remote socket address and authenticator classname for audit logging
if (settings.enableAuditLog) {
String address = ctx.channel().remoteAddress().toString();
if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
final String[] authClassParts = authenticator.getClass().toString().split("[.]");
auditLogger.info("User {} with address {} authenticated by {}",
user.getName(), address, authClassParts[authClassParts.length - 1]);
}
// If we have got here we are authenticated so remove the handler and pass
// the original message down the pipeline for processing
ctx.pipeline().remove(this);
final RequestMessage original = request.get();
ctx.fireChannelRead(original);
// Also send deferred requests if there are any down the pipeline for processing
if (deferredRequests.get() != null) {
deferredRequests.getAndSet(null).getValue().forEach(ctx::fireChannelRead);
}
} catch (AuthenticationException ae) {
negotiator.set(null);
respondWithError(
requestMessage,
builder -> builder.statusMessage(ae.getMessage()).code(ResponseStatusCode.UNAUTHORIZED),
ctx);
}
}