public void channelRead()

in interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/auth/IrAuthenticationHandler.java [88:280]


    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        if (msg instanceof RequestMessage) {
            final RequestMessage requestMessage = (RequestMessage) msg;

            final Attribute<Authenticator.SaslNegotiator> negotiator =
                    ((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
            final Attribute<RequestMessage> request =
                    ((AttributeMap) ctx).attr(StateKey.REQUEST_MESSAGE);

            if (negotiator.get() == null) {
                try {
                    // First time through so save the request and send an AUTHENTICATE challenge
                    // with no data
                    negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
                    request.set(requestMessage);
                    // the authentication flag is off, just pass the original message down the
                    // pipeline for processing
                    if (!authenticator.requireAuthentication()) {
                        ctx.pipeline().remove(this);
                        final RequestMessage original = request.get();
                        ctx.fireChannelRead(original);
                    } else {
                        final ResponseMessage authenticate =
                                ResponseMessage.build(requestMessage)
                                        .code(ResponseStatusCode.AUTHENTICATE)
                                        .create();
                        ctx.writeAndFlush(authenticate);
                    }
                } catch (Exception ex) {
                    // newSaslNegotiator can cause troubles - if we don't catch and respond nicely
                    // the driver seems
                    // to hang until timeout which isn't so nice. treating this like a server error
                    // as it means that
                    // the Authenticator isn't really ready to deal with requests for some reason.
                    logger.error(
                            String.format(
                                    "%s is not ready to handle requests - check its configuration"
                                            + " or related services",
                                    authenticator.getClass().getSimpleName()),
                            ex);

                    final ResponseMessage error =
                            ResponseMessage.build(requestMessage)
                                    .statusMessage("Authenticator is not ready to handle requests")
                                    .code(ResponseStatusCode.SERVER_ERROR)
                                    .create();
                    ctx.writeAndFlush(error);
                }
            } else {
                if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)
                        && requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {

                    final Object saslObject = requestMessage.getArgs().get(Tokens.ARGS_SASL);
                    final byte[] saslResponse;

                    if (saslObject instanceof String) {
                        saslResponse = BASE64_DECODER.decode((String) saslObject);
                    } else {
                        final ResponseMessage error =
                                ResponseMessage.build(request.get())
                                        .statusMessage(
                                                "Incorrect type for : "
                                                        + Tokens.ARGS_SASL
                                                        + " - base64 encoded String is expected")
                                        .code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST)
                                        .create();
                        ctx.writeAndFlush(error);
                        return;
                    }

                    try {
                        final byte[] saslMessage = negotiator.get().evaluateResponse(saslResponse);
                        if (negotiator.get().isComplete()) {
                            final AuthenticatedUser user = negotiator.get().getAuthenticatedUser();
                            ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
                            // Username logged with the remote socket address and authenticator
                            // classname for audit logging
                            if (settings.enableAuditLog || settings.authentication.enableAuditLog) {
                                String address = ctx.channel().remoteAddress().toString();
                                if (address.startsWith("/") && address.length() > 1)
                                    address = address.substring(1);
                                final String[] authClassParts =
                                        authenticator.getClass().toString().split("[.]");
                                auditLogger.info(
                                        "User {} with address {} authenticated by {}",
                                        user.getName(),
                                        address,
                                        authClassParts[authClassParts.length - 1]);
                            }
                            // If we have got here we are authenticated so remove the handler and
                            // pass
                            // the original message down the pipeline for processing
                            ctx.pipeline().remove(this);
                            final RequestMessage original = request.get();
                            ctx.fireChannelRead(original);
                        } else {
                            // not done here - send back the sasl message for next challenge.
                            final Map<String, Object> metadata = new HashMap<>();
                            metadata.put(
                                    Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(saslMessage));
                            final ResponseMessage authenticate =
                                    ResponseMessage.build(requestMessage)
                                            .statusAttributes(metadata)
                                            .code(ResponseStatusCode.AUTHENTICATE)
                                            .create();
                            ctx.writeAndFlush(authenticate);
                        }
                    } catch (AuthenticationException ae) {
                        final ResponseMessage error =
                                ResponseMessage.build(request.get())
                                        .statusMessage(ae.getMessage())
                                        .code(ResponseStatusCode.UNAUTHORIZED)
                                        .create();
                        ctx.writeAndFlush(error);
                    }
                } else {
                    final ResponseMessage error =
                            ResponseMessage.build(requestMessage)
                                    .statusMessage("Failed to authenticate")
                                    .code(ResponseStatusCode.UNAUTHORIZED)
                                    .create();
                    ctx.writeAndFlush(error);
                }
            }
        } else if (msg instanceof FullHttpMessage) { // add Authentication for HTTP requests
            FullHttpMessage request = (FullHttpMessage) msg;

            if (!authenticator.requireAuthentication()) {
                ctx.fireChannelRead(request);
                return;
            }

            String errorMsg =
                    "Invalid HTTP Header for Authentication. Expected format: 'Authorization: Basic"
                            + " <Base64(user:password)>'";

            if (!request.headers().contains("Authorization")) {
                sendError(ctx, errorMsg, request);
                return;
            }

            String authorizationHeader = request.headers().get("Authorization");
            if (!authorizationHeader.startsWith("Basic ")) {
                sendError(ctx, errorMsg, request);
                return;
            }

            String authorization;
            byte[] decodedUserPass;
            try {
                authorization = authorizationHeader.substring("Basic ".length());
                decodedUserPass = BASE64_DECODER.decode(authorization);
            } catch (Exception e) {
                sendError(ctx, errorMsg, request);
                return;
            }

            authorization = new String(decodedUserPass, Charset.forName("UTF-8"));
            String[] split = authorization.split(":");
            if (split.length != 2) {
                sendError(
                        ctx,
                        "Invalid username or password after decoding the Base64 Authorization"
                                + " header.",
                        request);
                return;
            }

            Map<String, String> credentials = new HashMap();
            credentials.put("username", split[0]);
            credentials.put("password", split[1]);
            String address = ctx.channel().remoteAddress().toString();
            if (address.startsWith("/") && address.length() > 1) {
                address = address.substring(1);
            }

            credentials.put("address", address);

            try {
                AuthenticatedUser user = authenticator.authenticate(credentials);
                ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
                ctx.fireChannelRead(request);
            } catch (AuthenticationException e) {
                sendError(ctx, e.getMessage(), request);
            }
        } else {
            logger.warn(
                    "{} received invalid request message {} - channel closing",
                    this.getClass().getSimpleName(),
                    msg.getClass());
            ctx.close();
        }
    }