public LoginContext login()

in clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java [95:258]


    public LoginContext login() throws LoginException {

        this.lastLogin = currentElapsedTime();
        loginContext = super.login();
        subject = loginContext.getSubject();
        isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();

        List<AppConfigurationEntry> entries = jaasContext().configurationEntries();
        if (entries.isEmpty()) {
            isUsingTicketCache = false;
            principal = null;
        } else {
            // there will only be a single entry
            AppConfigurationEntry entry = entries.get(0);
            if (entry.getOptions().get("useTicketCache") != null) {
                String val = (String) entry.getOptions().get("useTicketCache");
                isUsingTicketCache = val.equals("true");
            } else
                isUsingTicketCache = false;
            if (entry.getOptions().get("principal") != null)
                principal = (String) entry.getOptions().get("principal");
            else
                principal = null;
        }

        if (!isKrbTicket) {
            log.debug("[Principal={}]: It is not a Kerberos ticket", principal);
            t = null;
            // if no TGT, do not bother with ticket management.
            return loginContext;
        }
        log.debug("[Principal={}]: It is a Kerberos ticket", principal);

        // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the
        // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
        // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
        //  "modprinc -maxlife 3mins <principal>" in kadmin.
        t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
            public void run() {
                log.info("[Principal={}]: TGT refresh thread started.", principal);
                while (true) {  // renewal thread's main loop. if it exits from here, thread will exit.
                    KerberosTicket tgt = getTGT();
                    long now = currentWallTime();
                    long nextRefresh;
                    Date nextRefreshDate;
                    if (tgt == null) {
                        nextRefresh = now + minTimeBeforeRelogin;
                        nextRefreshDate = new Date(nextRefresh);
                        log.warn("[Principal={}]: No TGT found: will try again at {}", principal, nextRefreshDate);
                    } else {
                        nextRefresh = getRefreshTime(tgt);
                        long expiry = tgt.getEndTime().getTime();
                        Date expiryDate = new Date(expiry);
                        if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) {
                            log.warn("The TGT cannot be renewed beyond the next expiry date: {}." +
                                    "This process will not be able to authenticate new SASL connections after that " +
                                    "time (for example, it will not be able to authenticate a new connection with a Kafka " +
                                    "Broker).  Ask your system administrator to either increase the " +
                                    "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " +
                                    "kadmin, or instead, to generate a keytab for {}. Because the TGT's " +
                                    "expiry cannot be further extended by refreshing, exiting refresh thread now.",
                                    expiryDate, principal, principal);
                            return;
                        }
                        // determine how long to sleep from looking at ticket's expiry.
                        // We should not allow the ticket to expire, but we should take into consideration
                        // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so
                        // would cause ticket expiration.
                        if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) {
                            // expiry is before next scheduled refresh).
                            log.info("[Principal={}]: Refreshing now because expiry is before next scheduled refresh time.", principal);
                            nextRefresh = now;
                        } else {
                            if (nextRefresh < (now + minTimeBeforeRelogin)) {
                                // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
                                Date until = new Date(nextRefresh);
                                Date newUntil = new Date(now + minTimeBeforeRelogin);
                                log.warn("[Principal={}]: TGT refresh thread time adjusted from {} to {} since the former is sooner " +
                                        "than the minimum refresh interval ({} seconds) from now.",
                                        principal, until, newUntil, minTimeBeforeRelogin / 1000);
                            }
                            nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin);
                        }
                        nextRefreshDate = new Date(nextRefresh);
                        if (nextRefresh > expiry) {
                            log.error("[Principal={}]: Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." +
                                    "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.",
                                    principal, nextRefreshDate, expiryDate);
                            return;
                        }
                    }
                    if (now < nextRefresh) {
                        Date until = new Date(nextRefresh);
                        log.info("[Principal={}]: TGT refresh sleeping until: {}", principal, until);
                        try {
                            Thread.sleep(nextRefresh - now);
                        } catch (InterruptedException ie) {
                            log.warn("[Principal={}]: TGT renewal thread has been interrupted and will exit.", principal);
                            return;
                        }
                    } else {
                        log.error("[Principal={}]: NextRefresh: {} is in the past: exiting refresh thread. Check"
                                + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
                                + " Manual intervention will be required for this client to successfully authenticate."
                                + " Exiting refresh thread.", principal, nextRefreshDate);
                        return;
                    }
                    if (isUsingTicketCache) {
                        String kinitArgs = "-R";
                        int retry = 1;
                        while (retry >= 0) {
                            try {
                                log.debug("[Principal={}]: Running ticket cache refresh command: {} {}", principal, kinitCmd, kinitArgs);
                                Shell.execCommand(kinitCmd, kinitArgs);
                                break;
                            } catch (Exception e) {
                                if (retry > 0) {
                                    --retry;
                                    // sleep for 10 seconds
                                    try {
                                        Thread.sleep(10 * 1000);
                                    } catch (InterruptedException ie) {
                                        log.error("[Principal={}]: Interrupted while renewing TGT, exiting Login thread", principal);
                                        return;
                                    }
                                } else {
                                    log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'. " +
                                            "Exiting refresh thread.", principal, kinitCmd, kinitArgs, e);
                                    return;
                                }
                            }
                        }
                    }
                    try {
                        int retry = 1;
                        while (retry >= 0) {
                            try {
                                reLogin();
                                break;
                            } catch (LoginException le) {
                                if (retry > 0) {
                                    --retry;
                                    // sleep for 10 seconds.
                                    try {
                                        Thread.sleep(10 * 1000);
                                    } catch (InterruptedException e) {
                                        log.error("[Principal={}]: Interrupted during login retry after LoginException:", principal, le);
                                        throw le;
                                    }
                                } else {
                                    log.error("[Principal={}]: Could not refresh TGT.", principal, le);
                                }
                            }
                        }
                    } catch (LoginException le) {
                        log.error("[Principal={}]: Failed to refresh TGT: refresh thread exiting now.", principal, le);
                        return;
                    }
                }
            }
        });
        t.start();
        return loginContext;
    }