public void doWork()

in src/main/java/com/google/gcs/sdrs/service/worker/impl/DmBatchProcessingWorker.java [73:132]


  public void doWork() {
    Session currentLockSession = lockDao.getLockSession();
    try {

      logger.info(String.format("acquiring lock at %s", Instant.now(Clock.systemUTC()).toString()));
      DistributedLock distributedLock =
          lockDao.obtainLock(currentLockSession, DM_LOCK_TIMEOUT, DM_LOCK_ID);
      if (distributedLock != null) {
        logger.info(
            String.format(
                "acquired lock %s at %s",
                distributedLock.getLockToken(), Instant.now(Clock.systemUTC()).toString()));

        // get sorted (by priority) list of all available queue for processing
        List<DmRequest> allAvailableRequetsForProcessing =
            dmQueueDao.getAllAvailableRequestsByPriority();

        // sort the list by bucket while keeping the same order
        Map<String, List<DmRequest>> dmRequestsMap =
            allAvailableRequetsForProcessing.stream()
                .collect(Collectors.groupingBy(DmRequest::getDataStorageRoot));

        List<String> failedDmProcessingBuckets = new ArrayList<>();

        for (String bucket : dmRequestsMap.keySet()) {
          if (!processDmRequestByBucket(bucket, dmRequestsMap.get(bucket))) {
            failedDmProcessingBuckets.add(bucket);
          }
        }

        if (failedDmProcessingBuckets.isEmpty()) {
          logger.info(
              String.format(
                  "Successfully processed DM requests for %d buckets.", dmRequestsMap.size()));
          workerResult.setStatus(WorkerResultStatus.SUCCESS);
        } else {
          logger.error(
              String.format(
                  "DM requests processing failed for %d out of %d buckets.",
                  failedDmProcessingBuckets.size(), dmRequestsMap.size()));
          workerResult.setStatus(WorkerResultStatus.FAILED);
        }

        lockDao.releaseLock(currentLockSession, distributedLock);
        logger.info(
            String.format(
                "released lock %s at %s",
                distributedLock.getLockToken(), Instant.now(Clock.systemUTC()).toString()));
      } else {
        logger.info("Can not acquire lock.");
        workerResult.setStatus(WorkerResultStatus.SUCCESS);
      }
    } catch (Exception e) {
      logger.error("Unknown error. ", e);
      workerResult.setStatus(WorkerResultStatus.FAILED);
    } finally {
      // clean up resource
      lockDao.closeLockSession(currentLockSession);
    }
  }