in src/main/java/com/amazonaws/kinesisvideo/internal/service/AckConsumer.java [51:91]
private void processAckInputStream() {
Preconditions.checkNotNull(stream);
final byte[] buffer = new byte[FOUR_KB];
int bytesRead;
log.info("Starting ACK processing");
try {
while (!closed) {
// This is a blocking operation
bytesRead = ackStream.read(buffer);
String bytesString = null;
if (bytesRead > 0) {
bytesString = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
}
// Check for end-of-stream and 0 before processing
if (stream.getStreamHandle() == NativeKinesisVideoProducerJni.INVALID_STREAM_HANDLE_VALUE
|| bytesRead <= 0 || END_OF_STREAM_MSG.equals(bytesString)) {
// End-of-stream
log.debug("Received end-of-stream for ACKs.");
closed = true;
} else if (bytesRead != 0) {
log.debug("Received ACK bits: " + bytesString);
try {
stream.parseFragmentAck(uploadHandle, bytesString);
} catch (final ProducerException e) {
// Log the exception
log.exception(e, "Processing ACK threw an exception. Logging and continuing. ");
}
}
}
log.debug("Finished reading ACKs stream");
} catch (final IOException e) {
// Log and exit
log.exception(e);
} finally {
stoppedLatch.countDown();
}
}