in clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java [321:410]
private void doHandshake() throws IOException {
boolean read = key.isReadable();
boolean write = key.isWritable();
handshakeStatus = sslEngine.getHandshakeStatus();
if (!flush(netWriteBuffer)) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
return;
}
// Throw any pending handshake exception since `netWriteBuffer` has been flushed
maybeThrowSslAuthenticationException();
switch (handshakeStatus) {
case NEED_TASK:
log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
handshakeStatus = runDelegatedTasks();
break;
case NEED_WRAP:
log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
handshakeResult = handshakeWrap(write);
if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentNetWriteBufferSize = netWriteBufferSize();
netWriteBuffer.compact();
netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
netWriteBuffer.flip();
if (netWriteBuffer.limit() >= currentNetWriteBufferSize) {
throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.limit() +
") >= network buffer size (" + currentNetWriteBufferSize + ")");
}
} else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP.");
} else if (handshakeResult.getStatus() == Status.CLOSED) {
throw new EOFException();
}
log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
//if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents
//we will break here otherwise we can do need_unwrap in the same call.
if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || !flush(netWriteBuffer)) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
break;
}
case NEED_UNWRAP:
log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
do {
handshakeResult = handshakeUnwrap(read, false);
if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentAppBufferSize = applicationBufferSize();
appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
if (appReadBuffer.position() > currentAppBufferSize) {
throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() +
") > packet buffer size (" + currentAppBufferSize + ")");
}
}
} while (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW);
if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
int currentNetReadBufferSize = netReadBufferSize();
netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
if (netReadBuffer.position() >= currentNetReadBufferSize) {
throw new IllegalStateException("Buffer underflow when there is available data");
}
} else if (handshakeResult.getStatus() == Status.CLOSED) {
throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP");
}
log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
//if handshakeStatus completed than fall-through to finished status.
//after handshake is finished there is no data left to read/write in socketChannel.
//so the selector won't invoke this channel if we don't go through the handshakeFinished here.
if (handshakeStatus != HandshakeStatus.FINISHED) {
if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
break;
}
case FINISHED:
handshakeFinished();
break;
case NOT_HANDSHAKING:
handshakeFinished();
break;
default:
throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
}
}