in polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java [301:384]
protected void testLoadTasks() {
for (int i = 0; i < 20; i++) {
polarisTestMetaStoreManager.createEntity(
null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
}
String executorId = "testExecutor_abc";
PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager;
PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext;
List<PolarisBaseEntity> taskList =
metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
Assertions.assertThat(taskList)
.isNotNull()
.isNotEmpty()
.hasSize(5)
.allSatisfy(
entry ->
Assertions.assertThat(entry)
.extracting(
e ->
PolarisObjectMapperUtil.deserializeProperties(
callCtx, e.getProperties()))
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("lastAttemptExecutorId", executorId)
.containsEntry("attemptCount", "1"));
Set<String> firstTasks =
taskList.stream().map(PolarisBaseEntity::getName).collect(Collectors.toSet());
// grab a second round of tasks. Assert that none of the original 5 are in the list
List<PolarisBaseEntity> newTaskList =
metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
Assertions.assertThat(newTaskList)
.isNotNull()
.isNotEmpty()
.hasSize(5)
.extracting(PolarisBaseEntity::getName)
.noneMatch(firstTasks::contains);
Set<String> firstTenTaskNames =
Stream.concat(firstTasks.stream(), newTaskList.stream().map(PolarisBaseEntity::getName))
.collect(Collectors.toSet());
// only 10 tasks are unassigned. Requesting 20, we should only receive those 10
List<PolarisBaseEntity> lastTen =
metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
Assertions.assertThat(lastTen)
.isNotNull()
.isNotEmpty()
.hasSize(10)
.extracting(PolarisBaseEntity::getName)
.noneMatch(firstTenTaskNames::contains);
Set<String> allTaskNames =
Stream.concat(firstTenTaskNames.stream(), lastTen.stream().map(PolarisBaseEntity::getName))
.collect(Collectors.toSet());
List<PolarisBaseEntity> emtpyList =
metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
Assertions.assertThat(emtpyList).isNotNull().isEmpty();
timeSource.add(Duration.ofMinutes(10));
// all the tasks are unassigned. Fetch them all
List<PolarisBaseEntity> allTasks =
metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
Assertions.assertThat(allTasks)
.isNotNull()
.isNotEmpty()
.hasSize(20)
.extracting(PolarisBaseEntity::getName)
.allMatch(allTaskNames::contains);
// drop all the tasks. Skip the clock forward and fetch. empty list expected
allTasks.forEach(
entity -> metaStoreManager.dropEntityIfExists(callCtx, null, entity, Map.of(), false));
timeSource.add(Duration.ofMinutes(10));
List<PolarisBaseEntity> finalList =
metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
Assertions.assertThat(finalList).isNotNull().isEmpty();
}