in clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java [95:258]
public LoginContext login() throws LoginException {
this.lastLogin = currentElapsedTime();
loginContext = super.login();
subject = loginContext.getSubject();
isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
List<AppConfigurationEntry> entries = jaasContext().configurationEntries();
if (entries.isEmpty()) {
isUsingTicketCache = false;
principal = null;
} else {
// there will only be a single entry
AppConfigurationEntry entry = entries.get(0);
if (entry.getOptions().get("useTicketCache") != null) {
String val = (String) entry.getOptions().get("useTicketCache");
isUsingTicketCache = val.equals("true");
} else
isUsingTicketCache = false;
if (entry.getOptions().get("principal") != null)
principal = (String) entry.getOptions().get("principal");
else
principal = null;
}
if (!isKrbTicket) {
log.debug("[Principal={}]: It is not a Kerberos ticket", principal);
t = null;
// if no TGT, do not bother with ticket management.
return loginContext;
}
log.debug("[Principal={}]: It is a Kerberos ticket", principal);
// Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the
// TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
// you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
// "modprinc -maxlife 3mins <principal>" in kadmin.
t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
public void run() {
log.info("[Principal={}]: TGT refresh thread started.", principal);
while (true) { // renewal thread's main loop. if it exits from here, thread will exit.
KerberosTicket tgt = getTGT();
long now = currentWallTime();
long nextRefresh;
Date nextRefreshDate;
if (tgt == null) {
nextRefresh = now + minTimeBeforeRelogin;
nextRefreshDate = new Date(nextRefresh);
log.warn("[Principal={}]: No TGT found: will try again at {}", principal, nextRefreshDate);
} else {
nextRefresh = getRefreshTime(tgt);
long expiry = tgt.getEndTime().getTime();
Date expiryDate = new Date(expiry);
if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) {
log.warn("The TGT cannot be renewed beyond the next expiry date: {}." +
"This process will not be able to authenticate new SASL connections after that " +
"time (for example, it will not be able to authenticate a new connection with a Kafka " +
"Broker). Ask your system administrator to either increase the " +
"'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " +
"kadmin, or instead, to generate a keytab for {}. Because the TGT's " +
"expiry cannot be further extended by refreshing, exiting refresh thread now.",
expiryDate, principal, principal);
return;
}
// determine how long to sleep from looking at ticket's expiry.
// We should not allow the ticket to expire, but we should take into consideration
// minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so
// would cause ticket expiration.
if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) {
// expiry is before next scheduled refresh).
log.info("[Principal={}]: Refreshing now because expiry is before next scheduled refresh time.", principal);
nextRefresh = now;
} else {
if (nextRefresh < (now + minTimeBeforeRelogin)) {
// next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
Date until = new Date(nextRefresh);
Date newUntil = new Date(now + minTimeBeforeRelogin);
log.warn("[Principal={}]: TGT refresh thread time adjusted from {} to {} since the former is sooner " +
"than the minimum refresh interval ({} seconds) from now.",
principal, until, newUntil, minTimeBeforeRelogin / 1000);
}
nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin);
}
nextRefreshDate = new Date(nextRefresh);
if (nextRefresh > expiry) {
log.error("[Principal={}]: Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." +
"Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.",
principal, nextRefreshDate, expiryDate);
return;
}
}
if (now < nextRefresh) {
Date until = new Date(nextRefresh);
log.info("[Principal={}]: TGT refresh sleeping until: {}", principal, until);
try {
Thread.sleep(nextRefresh - now);
} catch (InterruptedException ie) {
log.warn("[Principal={}]: TGT renewal thread has been interrupted and will exit.", principal);
return;
}
} else {
log.error("[Principal={}]: NextRefresh: {} is in the past: exiting refresh thread. Check"
+ " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+ " Manual intervention will be required for this client to successfully authenticate."
+ " Exiting refresh thread.", principal, nextRefreshDate);
return;
}
if (isUsingTicketCache) {
String kinitArgs = "-R";
int retry = 1;
while (retry >= 0) {
try {
log.debug("[Principal={}]: Running ticket cache refresh command: {} {}", principal, kinitCmd, kinitArgs);
Shell.execCommand(kinitCmd, kinitArgs);
break;
} catch (Exception e) {
if (retry > 0) {
--retry;
// sleep for 10 seconds
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException ie) {
log.error("[Principal={}]: Interrupted while renewing TGT, exiting Login thread", principal);
return;
}
} else {
log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'. " +
"Exiting refresh thread.", principal, kinitCmd, kinitArgs, e);
return;
}
}
}
}
try {
int retry = 1;
while (retry >= 0) {
try {
reLogin();
break;
} catch (LoginException le) {
if (retry > 0) {
--retry;
// sleep for 10 seconds.
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
log.error("[Principal={}]: Interrupted during login retry after LoginException:", principal, le);
throw le;
}
} else {
log.error("[Principal={}]: Could not refresh TGT.", principal, le);
}
}
}
} catch (LoginException le) {
log.error("[Principal={}]: Failed to refresh TGT: refresh thread exiting now.", principal, le);
return;
}
}
}
});
t.start();
return loginContext;
}