in frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/Model.java [134:183]
public void pollBatch(String threadId, long waitTime, Map<String, Job> jobsRepo)
throws InterruptedException {
if (jobsRepo == null || threadId == null || threadId.isEmpty()) {
throw new IllegalArgumentException("Invalid input given provided");
}
if (!jobsRepo.isEmpty()) {
throw new IllegalArgumentException(
"The jobs repo provided contains stale jobs. Clear them!!");
}
LinkedBlockingDeque<Job> jobsQueue = jobsDb.get(threadId);
if (jobsQueue != null && !jobsQueue.isEmpty()) {
Job j = jobsQueue.poll(waitTime, TimeUnit.MILLISECONDS);
if (j != null) {
jobsRepo.put(j.getJobId(), j);
return;
}
}
try {
lock.lockInterruptibly();
long maxDelay = maxBatchDelay;
jobsQueue = jobsDb.get(DEFAULT_DATA_QUEUE);
Job j = jobsQueue.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
logger.trace("get first job: {}", Objects.requireNonNull(j).getJobId());
jobsRepo.put(j.getJobId(), j);
long begin = System.currentTimeMillis();
for (int i = 0; i < batchSize - 1; ++i) {
j = jobsQueue.poll(maxDelay, TimeUnit.MILLISECONDS);
if (j == null) {
break;
}
long end = System.currentTimeMillis();
maxDelay -= end - begin;
begin = end;
jobsRepo.put(j.getJobId(), j);
if (maxDelay <= 0) {
break;
}
}
logger.trace("sending jobs, size: {}", jobsRepo.size());
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}