public LoginContext login()

in clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java [101:264]


    public LoginContext login() throws LoginException {

        this.lastLogin = currentElapsedTime();
        loginContext = super.login();
        subject = loginContext.getSubject();
        isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();

        AppConfigurationEntry[] entries = configuration().getAppConfigurationEntry(contextName());
        if (entries.length == 0) {
            isUsingTicketCache = false;
            principal = null;
        } else {
            // there will only be a single entry
            AppConfigurationEntry entry = entries[0];
            if (entry.getOptions().get("useTicketCache") != null) {
                String val = (String) entry.getOptions().get("useTicketCache");
                isUsingTicketCache = val.equals("true");
            } else
                isUsingTicketCache = false;
            if (entry.getOptions().get("principal") != null)
                principal = (String) entry.getOptions().get("principal");
            else
                principal = null;
        }

        if (!isKrbTicket) {
            log.debug("[Principal={}]: It is not a Kerberos ticket", principal);
            t = null;
            // if no TGT, do not bother with ticket management.
            return loginContext;
        }
        log.debug("[Principal={}]: It is a Kerberos ticket", principal);

        // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the
        // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
        // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
        //  "modprinc -maxlife 3mins <principal>" in kadmin.
        t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), () -> {
            log.info("[Principal={}]: TGT refresh thread started.", principal);
            while (true) {  // renewal thread's main loop. if it exits from here, thread will exit.
                KerberosTicket tgt = getTGT();
                long now = currentWallTime();
                long nextRefresh;
                Date nextRefreshDate;
                if (tgt == null) {
                    nextRefresh = now + minTimeBeforeRelogin;
                    nextRefreshDate = new Date(nextRefresh);
                    log.warn("[Principal={}]: No TGT found: will try again at {}", principal, nextRefreshDate);
                } else {
                    nextRefresh = getRefreshTime(tgt);
                    long expiry = tgt.getEndTime().getTime();
                    Date expiryDate = new Date(expiry);
                    if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) {
                        log.warn("The TGT cannot be renewed beyond the next expiry date: {}." +
                            "This process will not be able to authenticate new SASL connections after that " +
                            "time (for example, it will not be able to authenticate a new connection with a Kafka " +
                            "Broker).  Ask your system administrator to either increase the " +
                            "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " +
                            "kadmin, or instead, to generate a keytab for {}. Because the TGT's " +
                            "expiry cannot be further extended by refreshing, exiting refresh thread now.",
                            expiryDate, principal, principal);
                        return;
                    }
                    // determine how long to sleep from looking at ticket's expiry.
                    // We should not allow the ticket to expire, but we should take into consideration
                    // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so
                    // would cause ticket expiration.
                    if ((nextRefresh > expiry) || (minTimeBeforeRelogin > expiry - now)) {
                        // expiry is before next scheduled refresh.
                        log.info("[Principal={}]: Refreshing now because expiry is before next scheduled refresh time.", principal);
                        nextRefresh = now;
                    } else {
                        if (nextRefresh - now < minTimeBeforeRelogin) {
                            // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
                            Date until = new Date(nextRefresh);
                            Date newUntil = new Date(now + minTimeBeforeRelogin);
                            log.warn("[Principal={}]: TGT refresh thread time adjusted from {} to {} since the former is sooner " +
                                "than the minimum refresh interval ({} seconds) from now.",
                                principal, until, newUntil, minTimeBeforeRelogin / 1000);
                        }
                        nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin);
                    }
                    nextRefreshDate = new Date(nextRefresh);
                    if (nextRefresh > expiry) {
                        log.error("[Principal={}]: Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." +
                            "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.",
                            principal, nextRefreshDate, expiryDate);
                        return;
                    }
                }
                if (now < nextRefresh) {
                    Date until = new Date(nextRefresh);
                    log.info("[Principal={}]: TGT refresh sleeping until: {}", principal, until);
                    try {
                        Thread.sleep(nextRefresh - now);
                    } catch (InterruptedException ie) {
                        log.warn("[Principal={}]: TGT renewal thread has been interrupted and will exit.", principal);
                        return;
                    }
                } else {
                    log.error("[Principal={}]: NextRefresh: {} is in the past: exiting refresh thread. Check"
                        + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
                        + " Manual intervention will be required for this client to successfully authenticate."
                        + " Exiting refresh thread.", principal, nextRefreshDate);
                    return;
                }
                if (isUsingTicketCache) {
                    String kinitArgs = "-R";
                    int retry = 1;
                    while (retry >= 0) {
                        try {
                            log.debug("[Principal={}]: Running ticket cache refresh command: {} {}", principal, kinitCmd, kinitArgs);
                            Shell.execCommand(kinitCmd, kinitArgs);
                            break;
                        } catch (Exception e) {
                            if (retry > 0) {
                                log.warn("[Principal={}]: Error when trying to renew with TicketCache, but will retry ", principal, e);
                                --retry;
                                // sleep for 10 seconds
                                try {
                                    Thread.sleep(10 * 1000);
                                } catch (InterruptedException ie) {
                                    log.error("[Principal={}]: Interrupted while renewing TGT, exiting Login thread", principal);
                                    return;
                                }
                            } else {
                                log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'. " +
                                    "Exiting refresh thread.", principal, kinitCmd, kinitArgs, e);
                                return;
                            }
                        }
                    }
                }
                try {
                    int retry = 1;
                    while (retry >= 0) {
                        try {
                            reLogin();
                            break;
                        } catch (LoginException le) {
                            if (retry > 0) {
                                log.warn("[Principal={}]: Error when trying to re-Login, but will retry ", principal, le);
                                --retry;
                                // sleep for 10 seconds.
                                try {
                                    Thread.sleep(10 * 1000);
                                } catch (InterruptedException e) {
                                    log.error("[Principal={}]: Interrupted during login retry after LoginException:", principal, le);
                                    throw le;
                                }
                            } else {
                                log.error("[Principal={}]: Could not refresh TGT.", principal, le);
                            }
                        }
                    }
                } catch (LoginException le) {
                    log.error("[Principal={}]: Failed to refresh TGT: refresh thread exiting now.", principal, le);
                    return;
                }
            }
        });
        t.start();
        return loginContext;
    }