private TTransport kerberosConnect()

in storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java [140:224]


    private TTransport kerberosConnect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException {
        //login our user
        SortedMap<String, ?> authConf = ClientAuthUtils.pullConfig(conf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
        if (authConf == null) {
            throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null");
        }

        boolean disableLoginCache = false;
        if (authConf.containsKey(DISABLE_LOGIN_CACHE)) {
            disableLoginCache = Boolean.valueOf((String) authConf.get(DISABLE_LOGIN_CACHE));
        }

        Login login;
        LoginCacheKey key = new LoginCacheKey(authConf);
        if (disableLoginCache) {
            LOG.debug("Kerberos Login Cache is disabled, attempting to contact the Kerberos Server");
            login = mkLogin();
            //this is to prevent the potential bug that
            //if the Login Cache is (1) enabled, and then (2) disabled and then (3) enabled again,
            //and if the LoginCacheKey remains unchanged, (3) will use the Login cache from (1), which could be wrong,
            //because the TGT cache (as well as the principle) could have been changed during (2)
            loginCache.remove(key);
        } else {
            LOG.debug("Trying to get the Kerberos Login from the Login Cache");
            login = loginCache.get(key);
            if (login == null) {
                synchronized (loginCache) {
                    login = loginCache.get(key);
                    if (login == null) {
                        LOG.debug("Kerberos Login was not found in the Login Cache, attempting to contact the Kerberos Server");
                        login = mkLogin();
                        loginCache.put(key, login);
                    }
                }
            }
        }

        final Subject subject = login.getSubject();
        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
            throw new RuntimeException("Fail to verify user principal with section \""
                    + ClientAuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file " + ClientAuthUtils.getJaasConf(conf));
        }

        final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
        String serviceName = ClientAuthUtils.get(conf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
        if (serviceName == null) {
            serviceName = ClientAuthUtils.SERVICE;
        }
        Map<String, String> props = new TreeMap<>();
        props.put(Sasl.QOP, "auth");
        props.put(Sasl.SERVER_AUTH, "false");

        LOG.debug("SASL GSSAPI client transport is being established");
        final TTransport sasalTransport = new TSaslClientTransport(KERBEROS,
                                                                   principal,
                                                                   serviceName,
                                                                   serverHost,
                                                                   props,
                                                                   null,
                                                                   transport);

        //open Sasl transport with the login credential
        try {
            Subject.doAs(subject,
                    new PrivilegedExceptionAction<Void>() {
                        @Override
                        public Void run() {
                            try {
                                LOG.debug("do as:" + principal);
                                sasalTransport.open();
                            } catch (Exception e) {
                                LOG.error("Client failed to open SaslClientTransport to interact with a server during "
                                                + "session initiation: "
                                                + e,
                                        e);
                        }
                        return null;
                    }
                });
        } catch (PrivilegedActionException e) {
            throw new RuntimeException(e);
        }

        return sasalTransport;
    }