compat/maven-embedder/src/main/java/org/apache/maven/cli/transfer/SimplexTransferListener.java [104:228]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void demux(List<Exchange> exchanges) {
        for (Exchange exchange : exchanges) {
            exchange.process(transferEvent -> {
                TransferEvent.EventType type = transferEvent.getType();
                try {
                    switch (type) {
                        case INITIATED:
                            delegate.transferInitiated(transferEvent);
                            break;
                        case STARTED:
                            delegate.transferStarted(transferEvent);
                            break;
                        case PROGRESSED:
                            delegate.transferProgressed(transferEvent);
                            break;
                        case CORRUPTED:
                            delegate.transferCorrupted(transferEvent);
                            break;
                        case SUCCEEDED:
                            delegate.transferSucceeded(transferEvent);
                            break;
                        case FAILED:
                            delegate.transferFailed(transferEvent);
                            break;
                        default:
                            LOGGER.warn("Invalid TransferEvent.EventType={}; ignoring it", type);
                    }
                } catch (TransferCancelledException e) {
                    ongoing.put(new TransferResourceIdentifier(transferEvent.getResource()), Boolean.FALSE);
                }
            });
        }
    }

    private void put(TransferEvent event, boolean last) {
        try {
            Exchange exchange;
            if (blockOnLastEvent && last) {
                exchange = new BlockingExchange(event);
            } else {
                exchange = new Exchange(event);
            }
            eventQueue.put(exchange);
            exchange.waitForProcessed();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private final ConcurrentHashMap<TransferResourceIdentifier, Boolean> ongoing = new ConcurrentHashMap<>();

    @Override
    public void transferInitiated(TransferEvent event) {
        ongoing.putIfAbsent(new TransferResourceIdentifier(event.getResource()), Boolean.TRUE);
        put(event, false);
    }

    @Override
    public void transferStarted(TransferEvent event) throws TransferCancelledException {
        if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
            throw new TransferCancelledException();
        }
        put(event, false);
    }

    @Override
    public void transferProgressed(TransferEvent event) throws TransferCancelledException {
        if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
            throw new TransferCancelledException();
        }
        put(event, false);
    }

    @Override
    public void transferCorrupted(TransferEvent event) throws TransferCancelledException {
        if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
            throw new TransferCancelledException();
        }
        put(event, false);
    }

    @Override
    public void transferSucceeded(TransferEvent event) {
        ongoing.remove(new TransferResourceIdentifier(event.getResource()));
        put(event, ongoing.isEmpty());
    }

    @Override
    public void transferFailed(TransferEvent event) {
        ongoing.remove(new TransferResourceIdentifier(event.getResource()));
        put(event, ongoing.isEmpty());
    }

    private static class Exchange {
        private final TransferEvent event;

        private Exchange(TransferEvent event) {
            this.event = event;
        }

        public void process(Consumer<TransferEvent> consumer) {
            consumer.accept(event);
        }

        public void waitForProcessed() throws InterruptedException {
            // nothing, is async
        }
    }

    private static class BlockingExchange extends Exchange {
        private final CountDownLatch latch = new CountDownLatch(1);

        private BlockingExchange(TransferEvent event) {
            super(event);
        }

        @Override
        public void process(Consumer<TransferEvent> consumer) {
            super.process(consumer);
            latch.countDown();
        }

        @Override
        public void waitForProcessed() throws InterruptedException {
            latch.await();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



impl/maven-cli/src/main/java/org/apache/maven/cling/transfer/SimplexTransferListener.java [114:238]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void demux(List<Exchange> exchanges) {
        for (Exchange exchange : exchanges) {
            exchange.process(transferEvent -> {
                TransferEvent.EventType type = transferEvent.getType();
                try {
                    switch (type) {
                        case INITIATED:
                            delegate.transferInitiated(transferEvent);
                            break;
                        case STARTED:
                            delegate.transferStarted(transferEvent);
                            break;
                        case PROGRESSED:
                            delegate.transferProgressed(transferEvent);
                            break;
                        case CORRUPTED:
                            delegate.transferCorrupted(transferEvent);
                            break;
                        case SUCCEEDED:
                            delegate.transferSucceeded(transferEvent);
                            break;
                        case FAILED:
                            delegate.transferFailed(transferEvent);
                            break;
                        default:
                            LOGGER.warn("Invalid TransferEvent.EventType={}; ignoring it", type);
                    }
                } catch (TransferCancelledException e) {
                    ongoing.put(new TransferResourceIdentifier(transferEvent.getResource()), Boolean.FALSE);
                }
            });
        }
    }

    private void put(TransferEvent event, boolean last) {
        try {
            Exchange exchange;
            if (blockOnLastEvent && last) {
                exchange = new BlockingExchange(event);
            } else {
                exchange = new Exchange(event);
            }
            eventQueue.put(exchange);
            exchange.waitForProcessed();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private final ConcurrentHashMap<TransferResourceIdentifier, Boolean> ongoing = new ConcurrentHashMap<>();

    @Override
    public void transferInitiated(TransferEvent event) {
        ongoing.putIfAbsent(new TransferResourceIdentifier(event.getResource()), Boolean.TRUE);
        put(event, false);
    }

    @Override
    public void transferStarted(TransferEvent event) throws TransferCancelledException {
        if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
            throw new TransferCancelledException();
        }
        put(event, false);
    }

    @Override
    public void transferProgressed(TransferEvent event) throws TransferCancelledException {
        if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
            throw new TransferCancelledException();
        }
        put(event, false);
    }

    @Override
    public void transferCorrupted(TransferEvent event) throws TransferCancelledException {
        if (ongoing.get(new TransferResourceIdentifier(event.getResource())) == Boolean.FALSE) {
            throw new TransferCancelledException();
        }
        put(event, false);
    }

    @Override
    public void transferSucceeded(TransferEvent event) {
        ongoing.remove(new TransferResourceIdentifier(event.getResource()));
        put(event, ongoing.isEmpty());
    }

    @Override
    public void transferFailed(TransferEvent event) {
        ongoing.remove(new TransferResourceIdentifier(event.getResource()));
        put(event, ongoing.isEmpty());
    }

    private static class Exchange {
        private final TransferEvent event;

        private Exchange(TransferEvent event) {
            this.event = event;
        }

        public void process(Consumer<TransferEvent> consumer) {
            consumer.accept(event);
        }

        public void waitForProcessed() throws InterruptedException {
            // nothing, is async
        }
    }

    private static class BlockingExchange extends Exchange {
        private final CountDownLatch latch = new CountDownLatch(1);

        private BlockingExchange(TransferEvent event) {
            super(event);
        }

        @Override
        public void process(Consumer<TransferEvent> consumer) {
            super.process(consumer);
            latch.countDown();
        }

        @Override
        public void waitForProcessed() throws InterruptedException {
            latch.await();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



