in src/main/java/com/google/gcs/sdrs/service/worker/impl/DmBatchProcessingWorker.java [73:132]
public void doWork() {
Session currentLockSession = lockDao.getLockSession();
try {
logger.info(String.format("acquiring lock at %s", Instant.now(Clock.systemUTC()).toString()));
DistributedLock distributedLock =
lockDao.obtainLock(currentLockSession, DM_LOCK_TIMEOUT, DM_LOCK_ID);
if (distributedLock != null) {
logger.info(
String.format(
"acquired lock %s at %s",
distributedLock.getLockToken(), Instant.now(Clock.systemUTC()).toString()));
// get sorted (by priority) list of all available queue for processing
List<DmRequest> allAvailableRequetsForProcessing =
dmQueueDao.getAllAvailableRequestsByPriority();
// sort the list by bucket while keeping the same order
Map<String, List<DmRequest>> dmRequestsMap =
allAvailableRequetsForProcessing.stream()
.collect(Collectors.groupingBy(DmRequest::getDataStorageRoot));
List<String> failedDmProcessingBuckets = new ArrayList<>();
for (String bucket : dmRequestsMap.keySet()) {
if (!processDmRequestByBucket(bucket, dmRequestsMap.get(bucket))) {
failedDmProcessingBuckets.add(bucket);
}
}
if (failedDmProcessingBuckets.isEmpty()) {
logger.info(
String.format(
"Successfully processed DM requests for %d buckets.", dmRequestsMap.size()));
workerResult.setStatus(WorkerResultStatus.SUCCESS);
} else {
logger.error(
String.format(
"DM requests processing failed for %d out of %d buckets.",
failedDmProcessingBuckets.size(), dmRequestsMap.size()));
workerResult.setStatus(WorkerResultStatus.FAILED);
}
lockDao.releaseLock(currentLockSession, distributedLock);
logger.info(
String.format(
"released lock %s at %s",
distributedLock.getLockToken(), Instant.now(Clock.systemUTC()).toString()));
} else {
logger.info("Can not acquire lock.");
workerResult.setStatus(WorkerResultStatus.SUCCESS);
}
} catch (Exception e) {
logger.error("Unknown error. ", e);
workerResult.setStatus(WorkerResultStatus.FAILED);
} finally {
// clean up resource
lockDao.closeLockSession(currentLockSession);
}
}