in clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java [560:655]
public int read(ByteBuffer dst) throws IOException {
if (state == State.CLOSING) return -1;
else if (!ready()) return 0;
//if we have unread decrypted data in appReadBuffer read that into dst buffer.
int read = 0;
if (appReadBuffer.position() > 0) {
read = readFromAppBuffer(dst);
}
boolean readFromNetwork = false;
boolean isClosed = false;
// Each loop reads at most once from the socket.
while (dst.remaining() > 0) {
int netread = 0;
netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
if (netReadBuffer.remaining() > 0) {
netread = readFromSocketChannel();
if (netread > 0)
readFromNetwork = true;
}
while (netReadBuffer.position() > 0) {
netReadBuffer.flip();
SSLEngineResult unwrapResult;
try {
unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
if (state == State.POST_HANDSHAKE && appReadBuffer.position() != 0) {
// For TLSv1.3, we have finished processing post-handshake messages since we are now processing data
state = State.READY;
}
} catch (SSLException e) {
// For TLSv1.3, handle SSL exceptions while processing post-handshake messages as authentication exceptions
if (state == State.POST_HANDSHAKE) {
state = State.HANDSHAKE_FAILED;
throw new SslAuthenticationException("Failed to process post-handshake messages", e);
} else
throw e;
}
netReadBuffer.compact();
// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed
if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
unwrapResult.getHandshakeStatus() != HandshakeStatus.FINISHED &&
unwrapResult.getStatus() == Status.OK &&
!sslEngine.getSession().getProtocol().equals(TLS13)) {
log.error("Renegotiation requested, but it is not supported, channelId {}, " +
"appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} handshakeStatus {}", channelId,
appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position(), unwrapResult.getHandshakeStatus());
throw renegotiationException();
}
if (unwrapResult.getStatus() == Status.OK) {
read += readFromAppBuffer(dst);
} else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentApplicationBufferSize = applicationBufferSize();
appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize);
if (appReadBuffer.position() >= currentApplicationBufferSize) {
throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() +
") >= application buffer size (" + currentApplicationBufferSize + ")");
}
// appReadBuffer will extended upto currentApplicationBufferSize
// we need to read the existing content into dst before we can do unwrap again. If there are no space in dst
// we can break here.
if (dst.hasRemaining())
read += readFromAppBuffer(dst);
else
break;
} else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
int currentNetReadBufferSize = netReadBufferSize();
netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
if (netReadBuffer.position() >= currentNetReadBufferSize) {
throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() +
") > packet buffer size (" + currentNetReadBufferSize + ")");
}
break;
} else if (unwrapResult.getStatus() == Status.CLOSED) {
// If data has been read and unwrapped, return the data. Close will be handled on the next poll.
if (appReadBuffer.position() == 0 && read == 0)
throw new EOFException();
else {
isClosed = true;
break;
}
}
}
if (read == 0 && netread < 0)
throw new EOFException("EOF during read");
if (netread <= 0 || isClosed)
break;
}
updateBytesBuffered(readFromNetwork || read > 0);
// If data has been read and unwrapped, return the data even if end-of-stream, channel will be closed
// on a subsequent poll.
return read;
}