private List waitAndPrefixGetFromPutEvent()

in computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java [336:381]


    private List<byte[]> waitAndPrefixGetFromPutEvent(
                         ByteSequence prefixSeq, int count,
                         List<KeyValue> existedKeyValues,
                         long revision, long timeout, long logInterval)
                         throws InterruptedException {
        Map<ByteSequence, ByteSequence> keyValues = new ConcurrentHashMap<>();
        for (KeyValue kv : existedKeyValues) {
            keyValues.put(kv.getKey(), kv.getValue());
        }
        WaitEvent<List<byte[]>> barrierEvent = new WaitEvent<>();
        Consumer<WatchResponse> consumer = watchResponse -> {
            List<WatchEvent> events = watchResponse.getEvents();
            for (WatchEvent event : events) {
                if (EventType.PUT.equals(event.getEventType())) {
                    KeyValue keyValue = event.getKeyValue();
                    keyValues.put(keyValue.getKey(), keyValue.getValue());
                    if (keyValues.size() == count) {
                        List<byte[]> result = new ArrayList<>(count);
                        for (ByteSequence byteSequence : keyValues.values()) {
                            result.add(byteSequence.getBytes());
                        }
                        barrierEvent.signalAll(result);
                    }
                } else if (EventType.DELETE.equals(event.getEventType())) {
                    keyValues.remove(event.getKeyValue().getKey());
                } else {
                    throw new ComputerException("Unexpected event type '%s'",
                                                event.getEventType());
                }
            }
        };
        WatchOption watchOption = WatchOption.newBuilder()
                                             .withPrefix(prefixSeq)
                                             .withRevision(revision)
                                             .build();
        try (Watch.Watcher watcher = this.watch.watch(prefixSeq,
                                                      watchOption,
                                                      consumer)) {
            return barrierEvent.await(timeout, logInterval, () -> {
                LOG.info("Wait for keys with prefix '{}' and timeout {}ms, " +
                         "expect {} keys but actual got {} keys",
                         prefixSeq.toString(ENCODING),
                         timeout, count, keyValues.size());
            });
        }
    }