private void startLiveVideoStreaming()

in src/main/java/com/aws/iot/edgeconnectorforkvs/EdgeConnectorForKVSService.java [479:591]


    private void startLiveVideoStreaming(EdgeConnectorForKVSConfiguration edgeConnectorForKVSConfiguration)
            throws IOException, InterruptedException {
        ReentrantLock processLock = edgeConnectorForKVSConfiguration.getProcessLock();
        try {
            if (processLock.tryLock(
                    INIT_LOCK_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)) {
                log.info("Start Live Video Streaming Called for " +
                        edgeConnectorForKVSConfiguration.getKinesisVideoStreamName());
                log.info("Calling function " + Constants.getCallingFunctionName(2));
                edgeConnectorForKVSConfiguration.setLiveStreamingRequestsCount(edgeConnectorForKVSConfiguration
                        .getLiveStreamingRequestsCount() + 1);
                if (edgeConnectorForKVSConfiguration.getLiveStreamingRequestsCount() > 1) {
                    log.info("Live Streaming already running. Requests Count: " +
                            edgeConnectorForKVSConfiguration.getLiveStreamingRequestsCount());
                    return;
                }
            } else {
                log.error("Start uploading for " + edgeConnectorForKVSConfiguration.getKinesisVideoStreamName()
                        + " timeout, re-init component to restart the process.");
                Constants.setFatalStatus(true);
            }
        } catch (InterruptedException e) {
            log.error("Start uploading for " + edgeConnectorForKVSConfiguration.getKinesisVideoStreamName()
                    + " has been interrupted, re-init component to restart the process.");
            Constants.setFatalStatus(true);
        } finally {
            if (processLock.isHeldByCurrentThread()) processLock.unlock();
        }

        // kick-off recording if it wasn't already started
        Future<?> future = recorderService.submit(() -> {
            startRecordingJob(edgeConnectorForKVSConfiguration);
        });
        try {
            // startRecordingJob is a blocking call, so we wait
            // upto 5 seconds for the recording to start before
            // we start live streaming below
            future.get(RECORDING_JOB_WAIT_TIME_IN_SECS, TimeUnit.SECONDS);
        } catch (InterruptedException ex) {
            log.error("Start Live Streaming Interrupted Exception: " + ex.getMessage());
        } catch (ExecutionException ex) {
            log.error("Start Live Streaming Execution Exception: " + ex.getMessage());
        } catch (TimeoutException ex) {
            // Ignore this exception, it is expected since
            // startRecordingJob is a blocking call
        }

        VideoRecorder videoRecorder = edgeConnectorForKVSConfiguration.getVideoRecorder();
        VideoUploader videoUploader = edgeConnectorForKVSConfiguration.getVideoUploader();

        do {
            PipedOutputStream outputStream = new PipedOutputStream();
            PipedInputStream inputStream = new PipedInputStream();

            // Toggle to false before switching outputStream (may not be required)
            videoRecorder.toggleAppDataOutputStream(false);

            edgeConnectorForKVSConfiguration.setOutputStream(outputStream);
            edgeConnectorForKVSConfiguration.setInputStream(inputStream);
            outputStream.connect(inputStream);
            videoRecorder.setAppDataOutputStream(outputStream);

            log.info("Connected streams for KVS Stream: " +
                    edgeConnectorForKVSConfiguration.getKinesisVideoStreamName());
            videoRecorder.toggleAppDataOutputStream(true);

            log.info("Turned on outputStream in recorder and start uploading!");
            Date dateNow = new Date();
            try {
                videoUploader.uploadStream(inputStream, dateNow, new StatusChangedCallBack(),
                        new UploadCallBack(dateNow, edgeConnectorForKVSConfiguration));
            } catch (Exception exception) {
                if (processLock.tryLock(INIT_LOCK_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)) {
                    log.error("Failed to upload stream: {}", exception.getMessage());

                    AtomicBoolean isRecorderToggleOff = new AtomicBoolean();
                    Thread toggleRecorderOffThreaed = new Thread(() -> {
                        log.info("Waiting for toggling recorder off");
                        videoRecorder.toggleAppDataOutputStream(false);
                        try {
                            TimeUnit.MILLISECONDS.sleep(2000);
                        } catch (InterruptedException e) {
                            log.error("toggleRecorderOffThread exception: " + e.getMessage());
                        }
                        isRecorderToggleOff.set(true);
                        log.info("Toggling recorder off");
                    });

                    toggleRecorderOffThreaed.start();
                    log.info("InputStream is flushing");
                    try {
                        int bytesAvailable = inputStream.available();
                        while (!isRecorderToggleOff.get() || bytesAvailable > 0) {
                            byte[] b = new byte[bytesAvailable];
                            inputStream.read(b);
                            bytesAvailable = inputStream.available();
                        }
                    } catch (IOException e) {
                        log.error("Exception flush intputStream: " + e.getMessage());
                    }
                    log.info("InputStream is flushed");

                    outputStream.close();
                    inputStream.close();
                } else {
                    log.error("Restart uploading for " + edgeConnectorForKVSConfiguration.getKinesisVideoStreamName()
                            + " timeout, re-init component to restart the process.");
                    Constants.setFatalStatus(true);
                    break;
                }
            }
        } while (retryOnFail && edgeConnectorForKVSConfiguration.getLiveStreamingRequestsCount() > 0);
    }