public Closeable forwardPortAsync()

in core/src/main/java/io/fabric8/maven/core/service/PortForwardService.java [70:192]


    public Closeable forwardPortAsync(final Logger externalProcessLogger, final LabelSelector podSelector, final int remotePort, final int localPort) throws Fabric8ServiceException {

        final Lock monitor = new ReentrantLock(true);
        final Condition podChanged = monitor.newCondition();
        final Pod[] nextForwardedPod = new Pod[1];

        final Thread forwarderThread = new Thread() {
            @Override
            public void run() {

                Pod currentPod = null;
                Closeable currentPortForward = null;

                try {
                    monitor.lock();

                    while (true) {
                        if (podEquals(currentPod, nextForwardedPod[0])) {
                            podChanged.await();
                        } else {
                            Pod nextPod = nextForwardedPod[0]; // may be null
                            try {
                                monitor.unlock();
                                // out of critical section

                                if (currentPortForward != null) {
                                    log.info("Closing port-forward from pod %s", KubernetesHelper.getName(currentPod));
                                    currentPortForward.close();
                                    currentPortForward = null;
                                }

                                if (nextPod != null) {
                                    log.info("Starting port-forward to pod %s", KubernetesHelper.getName(nextPod));
                                    currentPortForward = forwardPortAsync(externalProcessLogger, KubernetesHelper.getName(nextPod), remotePort, localPort);
                                } else {
                                    log.info("Waiting for a pod to become ready before starting port-forward");
                                }
                                currentPod = nextPod;
                            } finally {
                                monitor.lock();
                            }
                        }

                    }

                } catch (InterruptedException e) {
                    log.debug("Port-forwarding thread interrupted", e);
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    log.warn("Error while port-forwarding to pod", e);
                } finally {
                    monitor.unlock();

                    if (currentPortForward != null) {
                        try {
                            currentPortForward.close();
                        } catch (Exception e) {}
                    }
                }
            }
        };

        // Switching forward to the current pod if present
        Pod newPod = getNewestPod(podSelector);
        nextForwardedPod[0] = newPod;

        final Watch watch = KubernetesClientUtil.withSelector(kubernetes.pods(), podSelector, log).watch(new Watcher<Pod>() {

            @Override
            public void eventReceived(Action action, Pod pod) {
                monitor.lock();
                try {
                    List<Pod> candidatePods;
                    if (nextForwardedPod[0] != null) {
                        candidatePods = new LinkedList<>();
                        candidatePods.add(nextForwardedPod[0]);
                        candidatePods.add(pod);
                    } else {
                        candidatePods = Collections.singletonList(pod);
                    }
                    Pod newPod = getNewestPod(candidatePods); // may be null
                    if (!podEquals(nextForwardedPod[0], newPod)) {
                        nextForwardedPod[0] = newPod;
                        podChanged.signal();
                    }
                } finally {
                    monitor.unlock();
                }
            }

            @Override
            public void onClose(KubernetesClientException e) {
                // don't care
            }
        });

        forwarderThread.start();

        final Closeable handle = new Closeable() {
            @Override
            public void close() throws IOException {
                try {
                    watch.close();
                } catch (Exception e) {}
                try {
                    forwarderThread.interrupt();
                    forwarderThread.join(15000);
                } catch (Exception e) {}
            }
        };
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    handle.close();
                } catch (Exception e) {
                    // suppress
                }
            }
        });

        return handle;
    }