public void request()

in src/main/java/com/amazonaws/kvstranscribestreaming/transcribe/KVSByteToAudioEventSubscription.java [52:88]


    public void request(long n) {
        if (n <= 0) {
            subscriber.onError(new IllegalArgumentException("Demand must be positive"));
        }

        demand.getAndAdd(n);
        // We need to invoke this in a separate thread because the call to
        // subscriber.onNext(...) is recursive
        executor.submit(() -> {
            try {
                while (demand.get() > 0) {
                    ByteBuffer audioBuffer = KVSUtils.getByteBufferFromStream(streamingMkvReader, fragmentVisitor,
                            tagProcessor, CHUNK_SIZE_IN_KB);

                    if (audioBuffer.remaining() > 0) {

                        AudioEvent audioEvent = audioEventFromBuffer(audioBuffer);
                        subscriber.onNext(audioEvent);

                        if (shouldWriteToOutputStream) {
                            // Write audioBytes to a temporary file as they are received from the stream
                            byte[] audioBytes = new byte[audioBuffer.remaining()];
                            audioBuffer.get(audioBytes);
                            outputStream.write(audioBytes);
                        }

                    } else {
                        subscriber.onComplete();
                        break;
                    }
                    demand.getAndDecrement();
                }
            } catch (Exception e) {
                subscriber.onError(e);
            }
        });
    }