in functions/source/amazon-chime-recordandtranscribe/src/main/java/com/amazonaws/kvstranscribestreaming/transcribe/KVSByteToAudioEventSubscription.java [52:88]
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Demand must be positive"));
}
demand.getAndAdd(n);
// We need to invoke this in a separate thread because the call to
// subscriber.onNext(...) is recursive
executor.submit(() -> {
try {
while (demand.get() > 0) {
ByteBuffer audioBuffer = KVSUtils.getByteBufferFromStream(streamingMkvReader, fragmentVisitor,
tagProcessor, CHUNK_SIZE_IN_KB);
if (audioBuffer.remaining() > 0) {
AudioEvent audioEvent = audioEventFromBuffer(audioBuffer);
subscriber.onNext(audioEvent);
if (shouldWriteToOutputStream) {
// Write audioBytes to a temporary file as they are received from the stream
byte[] audioBytes = new byte[audioBuffer.remaining()];
audioBuffer.get(audioBytes);
outputStream.write(audioBytes);
}
} else {
subscriber.onComplete();
break;
}
demand.getAndDecrement();
}
} catch (Exception e) {
subscriber.onError(e);
}
});
}