in src/main/java/com/amazonaws/kinesisvideo/internal/producer/jni/NativeKinesisVideoProducerStream.java [60:142]
public int read(final byte[] b,
final int off,
final int len)
throws IOException {
if (mStreamClosed) {
mLog.warn("Stream %s with uploadHandle %d has been closed", mStreamInfo.getName(), mUploadHandle);
}
// Read from the KinesisVideo Producer
// NOTE: This implementation is a blocking call and the blocking
// is handled by simply spin-locking until the data is available.
int bytesRead = -1;
while (!mStreamClosed) {
synchronized (mMonitor) {
while (!mDataAvailable && !mStreamClosed) {
try {
mLog.debug("no data for stream %s with uploadHandle %d, waiting", mStreamInfo.getName(),
mUploadHandle);
mMonitor.wait(TIMEOUT_IN_MS);
} catch (final InterruptedException e) {
mLog.exception(e, "Waiting for the data availability with uploadHandle %d"
+ "threw an interrupted exception. Continuing...", mUploadHandle);
}
}
// Clear the availability indicator for now
mDataAvailable = false;
if (mStreamClosed) {
// Indicate the EOS
bytesRead = -1;
mLog.debug("Being notified to close stream %s with uploadHandle %d",
mStreamInfo.getName(), mUploadHandle);
return bytesRead;
}
}
try {
mKinesisVideoProducerJni.getStreamData(mStreamHandle, mUploadHandle, b, off, len, mReadResult);
bytesRead = mReadResult.getReadBytes();
mLog.debug("getStreamData fill %d bytes for stream %s with uploadHandle %d", bytesRead,
mStreamInfo.getName(), mUploadHandle);
if (mReadResult.isEndOfStream()) {
// EOS for current session
mLog.info("Received end-of-stream indicator for %s, uploadHandle %d",
mStreamInfo.getName(), mUploadHandle);
// Set the flag so the stream is not valid any longer
mStreamClosed = true;
if (0 == bytesRead) {
// Indicate the EOS
bytesRead = -1;
}
}
synchronized (mMonitor) {
if (bytesRead != 0) {
// Got some bytes - break from the loop.
// Make sure we don't await again if we still have some data
if (bytesRead != -1 && mAvailableDataSize - bytesRead > 0) {
mDataAvailable = true;
}
break;
}
}
} catch (final ProducerException e) {
mLog.exception(e, "Reader threw an exception");
throw new IOException(e);
}
}
mLog.debug("Streamed %d bytes for stream %s with uploadHandle %d", bytesRead, mStreamInfo.getName(),
mUploadHandle);
if (-1 == bytesRead) {
mLog.debug("Closing stream %s with uploadHandle %d", mStreamInfo.getName(), mUploadHandle);
}
return bytesRead;
}