in lca-ai-stack/source/kvs_transcribe_streaming/src/main/java/com/amazonaws/transcribestreaming/KVSByteToAudioEventSubscription.java [69:102]
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Demand must be positive"));
}
demand.getAndAdd(n);
//We need to invoke this in a separate thread because the call to subscriber.onNext(...) is recursive
executor.submit(() -> {
try {
while (demand.get() > 0) {
// return byteBufferDetails and consume this with an input stream then feed to output stream
ByteBuffer audioBuffer = KVSUtils.getByteBufferFromStream(streamingMkvReader, fragmentVisitor, tagProcessor, callId, CHUNK_SIZE_IN_KB, track);
if (audioBuffer.remaining() > 0) {
AudioEvent audioEvent = audioEventFromBuffer(audioBuffer);
subscriber.onNext(audioEvent);
//Write audioBytes to a temporary file as they are received from the stream
byte[] audioBytes = new byte[audioBuffer.remaining()];
audioBuffer.get(audioBytes);
outputStream.write(audioBytes);
} else {
subscriber.onComplete();
break;
}
demand.getAndDecrement();
}
} catch (Exception e) {
subscriber.onError(e);
}
});
}