in computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java [336:381]
private List<byte[]> waitAndPrefixGetFromPutEvent(
ByteSequence prefixSeq, int count,
List<KeyValue> existedKeyValues,
long revision, long timeout, long logInterval)
throws InterruptedException {
Map<ByteSequence, ByteSequence> keyValues = new ConcurrentHashMap<>();
for (KeyValue kv : existedKeyValues) {
keyValues.put(kv.getKey(), kv.getValue());
}
WaitEvent<List<byte[]>> barrierEvent = new WaitEvent<>();
Consumer<WatchResponse> consumer = watchResponse -> {
List<WatchEvent> events = watchResponse.getEvents();
for (WatchEvent event : events) {
if (EventType.PUT.equals(event.getEventType())) {
KeyValue keyValue = event.getKeyValue();
keyValues.put(keyValue.getKey(), keyValue.getValue());
if (keyValues.size() == count) {
List<byte[]> result = new ArrayList<>(count);
for (ByteSequence byteSequence : keyValues.values()) {
result.add(byteSequence.getBytes());
}
barrierEvent.signalAll(result);
}
} else if (EventType.DELETE.equals(event.getEventType())) {
keyValues.remove(event.getKeyValue().getKey());
} else {
throw new ComputerException("Unexpected event type '%s'",
event.getEventType());
}
}
};
WatchOption watchOption = WatchOption.newBuilder()
.withPrefix(prefixSeq)
.withRevision(revision)
.build();
try (Watch.Watcher watcher = this.watch.watch(prefixSeq,
watchOption,
consumer)) {
return barrierEvent.await(timeout, logInterval, () -> {
LOG.info("Wait for keys with prefix '{}' and timeout {}ms, " +
"expect {} keys but actual got {} keys",
prefixSeq.toString(ENCODING),
timeout, count, keyValues.size());
});
}
}