in polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java [387:443]
protected void testLoadTasksInParallel() throws Exception {
for (int i = 0; i < 100; i++) {
polarisTestMetaStoreManager.createEntity(
null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
}
PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager;
PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext;
List<Future<Set<String>>> futureList = new ArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 3; i++) {
final String executorId = "taskExecutor_" + i;
futureList.add(
executorService.submit(
() -> {
Set<String> taskNames = new HashSet<>();
List<PolarisBaseEntity> taskList = List.of();
boolean retry = false;
do {
retry = false;
try {
taskList = metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
taskList.stream().map(PolarisBaseEntity::getName).forEach(taskNames::add);
} catch (RetryOnConcurrencyException e) {
retry = true;
}
} while (retry || !taskList.isEmpty());
return taskNames;
}));
}
} finally {
executorService.shutdown();
Assertions.assertThat(executorService.awaitTermination(30, TimeUnit.SECONDS)).isTrue();
}
List<Set<String>> responses =
futureList.stream()
.map(
f -> {
try {
return f.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
Assertions.assertThat(responses)
.hasSize(3)
.satisfies(l -> Assertions.assertThat(l.stream().flatMap(Set::stream)).hasSize(100));
Map<String, Integer> taskCounts =
responses.stream()
.flatMap(Set::stream)
.collect(Collectors.toMap(Function.identity(), (val) -> 1, Integer::sum));
Assertions.assertThat(taskCounts)
.hasSize(100)
.allSatisfy((k, v) -> Assertions.assertThat(v).isEqualTo(1));
}