in src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java [47:100]
public void onNext(ByteBuffer byteBuffer) {
int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer);
if (amountToReadFromByteBuffer > 0) {
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer);
if (outputBuffer == null || outputBuffer.length == 0) {
// The underlying data is too short to fill in the block cipher.
// Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
// in practice SunJCE and ACCP return an empty buffer instead, hence checks for
// null OR length == 0.
if (contentRead.get() + tagLength >= contentLength) {
// All content has been read, so complete to get the final bytes
finalBytes();
return;
}
// Otherwise, wait for more bytes. To avoid blocking,
// send an empty buffer to the wrapped subscriber.
wrappedSubscriber.onNext(ByteBuffer.allocate(0));
} else {
/*
Check if stream has read all expected content.
Once all content has been read, call `finalBytes`.
This determines that all content has been read by checking if
the amount of data read so far plus the tag length is at least the content length.
Once this is true, downstream will never call `request` again
(beyond the current request that is being responded to in this onNext invocation.)
As a result, this class can only call `wrappedSubscriber.onNext` one more time.
(Reactive streams require that downstream sends a `request(n)`
to indicate it is ready for more data, and upstream responds to that request by calling `onNext`.
The `n` in request is the maximum number of `onNext` calls that downstream
will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.)
Since this class can only call `wrappedSubscriber.onNext` once,
it must send all remaining data in the next onNext call,
including the result of cipher.doFinal(), if applicable.
Calling `wrappedSubscriber.onNext` more than once for `request(1)`
violates the Reactive Streams specification and can cause exceptions downstream.
*/
if (contentRead.get() + tagLength >= contentLength) {
// All content has been read; complete the stream.
finalBytes();
} else {
// Needs to read more data, so send the data downstream,
// expecting that downstream will continue to request more data.
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
}
}
} else {
// Do nothing
wrappedSubscriber.onNext(byteBuffer);
}
}