in interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/auth/IrAuthenticationHandler.java [88:280]
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (msg instanceof RequestMessage) {
final RequestMessage requestMessage = (RequestMessage) msg;
final Attribute<Authenticator.SaslNegotiator> negotiator =
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
final Attribute<RequestMessage> request =
((AttributeMap) ctx).attr(StateKey.REQUEST_MESSAGE);
if (negotiator.get() == null) {
try {
// First time through so save the request and send an AUTHENTICATE challenge
// with no data
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
request.set(requestMessage);
// the authentication flag is off, just pass the original message down the
// pipeline for processing
if (!authenticator.requireAuthentication()) {
ctx.pipeline().remove(this);
final RequestMessage original = request.get();
ctx.fireChannelRead(original);
} else {
final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
.code(ResponseStatusCode.AUTHENTICATE)
.create();
ctx.writeAndFlush(authenticate);
}
} catch (Exception ex) {
// newSaslNegotiator can cause troubles - if we don't catch and respond nicely
// the driver seems
// to hang until timeout which isn't so nice. treating this like a server error
// as it means that
// the Authenticator isn't really ready to deal with requests for some reason.
logger.error(
String.format(
"%s is not ready to handle requests - check its configuration"
+ " or related services",
authenticator.getClass().getSimpleName()),
ex);
final ResponseMessage error =
ResponseMessage.build(requestMessage)
.statusMessage("Authenticator is not ready to handle requests")
.code(ResponseStatusCode.SERVER_ERROR)
.create();
ctx.writeAndFlush(error);
}
} else {
if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
final Object saslObject = requestMessage.getArgs().get(Tokens.ARGS_SASL);
final byte[] saslResponse;
if (saslObject instanceof String) {
saslResponse = BASE64_DECODER.decode((String) saslObject);
} else {
final ResponseMessage error =
ResponseMessage.build(request.get())
.statusMessage(
"Incorrect type for : "
+ Tokens.ARGS_SASL
+ " - base64 encoded String is expected")
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST)
.create();
ctx.writeAndFlush(error);
return;
}
try {
final byte[] saslMessage = negotiator.get().evaluateResponse(saslResponse);
if (negotiator.get().isComplete()) {
final AuthenticatedUser user = negotiator.get().getAuthenticatedUser();
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
// Username logged with the remote socket address and authenticator
// classname for audit logging
if (settings.enableAuditLog || settings.authentication.enableAuditLog) {
String address = ctx.channel().remoteAddress().toString();
if (address.startsWith("/") && address.length() > 1)
address = address.substring(1);
final String[] authClassParts =
authenticator.getClass().toString().split("[.]");
auditLogger.info(
"User {} with address {} authenticated by {}",
user.getName(),
address,
authClassParts[authClassParts.length - 1]);
}
// If we have got here we are authenticated so remove the handler and
// pass
// the original message down the pipeline for processing
ctx.pipeline().remove(this);
final RequestMessage original = request.get();
ctx.fireChannelRead(original);
} else {
// not done here - send back the sasl message for next challenge.
final Map<String, Object> metadata = new HashMap<>();
metadata.put(
Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(saslMessage));
final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
.statusAttributes(metadata)
.code(ResponseStatusCode.AUTHENTICATE)
.create();
ctx.writeAndFlush(authenticate);
}
} catch (AuthenticationException ae) {
final ResponseMessage error =
ResponseMessage.build(request.get())
.statusMessage(ae.getMessage())
.code(ResponseStatusCode.UNAUTHORIZED)
.create();
ctx.writeAndFlush(error);
}
} else {
final ResponseMessage error =
ResponseMessage.build(requestMessage)
.statusMessage("Failed to authenticate")
.code(ResponseStatusCode.UNAUTHORIZED)
.create();
ctx.writeAndFlush(error);
}
}
} else if (msg instanceof FullHttpMessage) { // add Authentication for HTTP requests
FullHttpMessage request = (FullHttpMessage) msg;
if (!authenticator.requireAuthentication()) {
ctx.fireChannelRead(request);
return;
}
String errorMsg =
"Invalid HTTP Header for Authentication. Expected format: 'Authorization: Basic"
+ " <Base64(user:password)>'";
if (!request.headers().contains("Authorization")) {
sendError(ctx, errorMsg, request);
return;
}
String authorizationHeader = request.headers().get("Authorization");
if (!authorizationHeader.startsWith("Basic ")) {
sendError(ctx, errorMsg, request);
return;
}
String authorization;
byte[] decodedUserPass;
try {
authorization = authorizationHeader.substring("Basic ".length());
decodedUserPass = BASE64_DECODER.decode(authorization);
} catch (Exception e) {
sendError(ctx, errorMsg, request);
return;
}
authorization = new String(decodedUserPass, Charset.forName("UTF-8"));
String[] split = authorization.split(":");
if (split.length != 2) {
sendError(
ctx,
"Invalid username or password after decoding the Base64 Authorization"
+ " header.",
request);
return;
}
Map<String, String> credentials = new HashMap();
credentials.put("username", split[0]);
credentials.put("password", split[1]);
String address = ctx.channel().remoteAddress().toString();
if (address.startsWith("/") && address.length() > 1) {
address = address.substring(1);
}
credentials.put("address", address);
try {
AuthenticatedUser user = authenticator.authenticate(credentials);
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
ctx.fireChannelRead(request);
} catch (AuthenticationException e) {
sendError(ctx, e.getMessage(), request);
}
} else {
logger.warn(
"{} received invalid request message {} - channel closing",
this.getClass().getSimpleName(),
msg.getClass());
ctx.close();
}
}