public void channelRead()

in gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java [81:197]


    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        if (!(msg instanceof RequestMessage)) {
            logger.warn("{} only processes RequestMessage instances - received {} - channel closing",
                    this.getClass().getSimpleName(), msg.getClass());
            ctx.close();
            return;
        }

        final RequestMessage requestMessage = (RequestMessage) msg;

        final Attribute<Authenticator.SaslNegotiator> negotiator = ctx.channel().attr(StateKey.NEGOTIATOR);
        final Attribute<RequestMessage> request = ctx.channel().attr(StateKey.REQUEST_MESSAGE);
        final Attribute<Pair<LocalDateTime, List<RequestMessage>>> deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);

        if (negotiator.get() == null) {
            try {
                // First time through so save the request and send an AUTHENTICATE challenge with no data
                negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
                request.set(requestMessage);
                final ResponseMessage authenticate = ResponseMessage.build(requestMessage)
                        .code(ResponseStatusCode.AUTHENTICATE).create();
                ctx.writeAndFlush(authenticate);
            } catch (Exception ex) {
                // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems
                // to hang until timeout which isn't so nice. treating this like a server error as it means that
                // the Authenticator isn't really ready to deal with requests for some reason.
                logger.error(String.format("%s is not ready to handle requests - check its configuration or related services",
                        authenticator.getClass().getSimpleName()), ex);

                respondWithError(
                        requestMessage,
                        builder -> builder.statusMessage("Authenticator is not ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
                        ctx);
            }

            return;
        } else if (!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
            // If authentication negotiation is pending, store subsequent non-authentication requests for later processing
            deferredRequests.setIfAbsent(new ImmutablePair<>(LocalDateTime.now(), new ArrayList<>()));
            deferredRequests.get().getValue().add(requestMessage);

            final Duration deferredDuration = Duration.between(deferredRequests.get().getKey(), LocalDateTime.now());

            if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) > 0) {
                respondWithError(
                        requestMessage,
                        builder -> builder.statusMessage("Authentication did not finish in the allowed duration (" + MAX_REQUEST_DEFERRABLE_DURATION + "s).")
                                .code(ResponseStatusCode.UNAUTHORIZED),
                        ctx);
                return;
            }

            return;
        } else if (!requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
            // This is an authentication request that is missing a "sasl" argument.
            respondWithError(
                    requestMessage,
                    builder -> builder.statusMessage("Failed to authenticate").code(ResponseStatusCode.UNAUTHORIZED),
                    ctx);
            return;
        }

        final Object saslObject = requestMessage.getArgs().get(Tokens.ARGS_SASL);

        if (!(saslObject instanceof String)) {
            respondWithError(
                    requestMessage,
                    builder -> builder
                            .statusMessage("Incorrect type for : " + Tokens.ARGS_SASL + " - base64 encoded String is expected")
                            .code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST),
                    ctx);
            return;
        }

        try {
            final byte[] saslResponse = BASE64_DECODER.decode((String) saslObject);
            final byte[] saslMessage = negotiator.get().evaluateResponse(saslResponse);

            if (!negotiator.get().isComplete()) {
                // not done here - send back the sasl message for next challenge.
                final HashMap<String, Object> metadata = new HashMap<>();
                metadata.put(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(saslMessage));
                final ResponseMessage authenticate = ResponseMessage.build(requestMessage)
                        .statusAttributes(metadata)
                        .code(ResponseStatusCode.AUTHENTICATE).create();
                ctx.writeAndFlush(authenticate);
                return;
            }

            final org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser user = negotiator.get().getAuthenticatedUser();
            ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
            // User name logged with the remote socket address and authenticator classname for audit logging
            if (settings.enableAuditLog) {
                String address = ctx.channel().remoteAddress().toString();
                if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
                final String[] authClassParts = authenticator.getClass().toString().split("[.]");
                auditLogger.info("User {} with address {} authenticated by {}",
                        user.getName(), address, authClassParts[authClassParts.length - 1]);
            }
            // If we have got here we are authenticated so remove the handler and pass
            // the original message down the pipeline for processing
            ctx.pipeline().remove(this);
            final RequestMessage original = request.get();
            ctx.fireChannelRead(original);

            // Also send deferred requests if there are any down the pipeline for processing
            if (deferredRequests.get() != null) {
                deferredRequests.getAndSet(null).getValue().forEach(ctx::fireChannelRead);
            }
        } catch (AuthenticationException ae) {
            negotiator.set(null);
            respondWithError(
                    requestMessage,
                    builder -> builder.statusMessage(ae.getMessage()).code(ResponseStatusCode.UNAUTHORIZED),
                    ctx);
        }
    }