private boolean processDmRequestByBucket()

in src/main/java/com/google/gcs/sdrs/service/worker/impl/DmBatchProcessingWorker.java [134:266]


  private boolean processDmRequestByBucket(String bucket, List<DmRequest> dmRequests) {
    String destinationBucket = StsUtil.buildDestinationBucketName(bucket);

    ZonedDateTime zonedDateTimeNow = ZonedDateTime.now(Clock.systemUTC());
    String scheduleTimeOfDay = zonedDateTimeNow.format(DateTimeFormatter.ofPattern("HH:mm:ss"));
    String projectId = dmRequests.get(0).getProjectId();

    TransferJob transferJob = null;
    try {
      transferJob =
          StsRuleExecutor.getInstance()
              .findPooledJob(projectId, bucket, scheduleTimeOfDay, RetentionRuleType.USER);
    } catch (IOException e) {
      // Can't allocate the job from the pool. Fail immediately.
      return false;
    }

    TimeOfDay jobRunAtTimeOfDay = transferJob.getSchedule().getStartTimeOfDay();

    // derived from transferJob.tomeOfDay and now
    ZonedDateTime lastRunTime =
        ZonedDateTime.of(
                zonedDateTimeNow.getYear(),
                zonedDateTimeNow.getMonthValue(),
                zonedDateTimeNow.getDayOfMonth(),
                jobRunAtTimeOfDay.getHours() != null ? jobRunAtTimeOfDay.getHours() : 0,
                jobRunAtTimeOfDay.getMinutes() != null ? jobRunAtTimeOfDay.getMinutes() : 0,
                jobRunAtTimeOfDay.getSeconds() != null ? jobRunAtTimeOfDay.getSeconds() : 0,
                jobRunAtTimeOfDay.getNanos() != null ? jobRunAtTimeOfDay.getNanos() : 0,
                ZoneId.of("UTC"))
            .minusHours(24);

    ZonedDateTime lastModifiedTime = ZonedDateTime.parse(transferJob.getLastModificationTime());
    List<String> existingIncludePrefixList = new ArrayList<>();
    if (transferJob.getStatus().equals(StsUtil.STS_ENABLED_STRING)) {
      ObjectConditions objectConditions = transferJob.getTransferSpec().getObjectConditions();
      if (objectConditions != null && objectConditions.getIncludePrefixes() != null) {
        existingIncludePrefixList = objectConditions.getIncludePrefixes();
      }
    }

    int initPrefxiNumber = 0;

    Set<String> newIncludePrefixSet = new HashSet<>();
    // Replace the existing prefix list if last modified time is older than the last job run time,
    // meaning the daily STS job has already run and the existing prefix list has been processed.
    if (lastModifiedTime.isBefore(lastRunTime)) {
      initPrefxiNumber = StsUtil.MAX_PREFIX_COUNT;
    } else {
      initPrefxiNumber = StsUtil.MAX_PREFIX_COUNT - existingIncludePrefixList.size();
      newIncludePrefixSet.addAll(existingIncludePrefixList);
    }

    int maxCount = Math.min(initPrefxiNumber, dmRequests.size());
    int total = 0;
    int start = 0;
    List<String> newIncludePrefixList = null;

    while (total < StsUtil.MAX_PREFIX_COUNT && start < dmRequests.size()) {
      for (int i = 0; i < maxCount; i++) {
        String prefix =
            RetentionUtil.getDatasetPath(dmRequests.get(i + start).getDataStorageName());
        if (prefix != null && !prefix.isEmpty()) {
          if (!prefix.endsWith("/")) {
            prefix = prefix + "/";
          }
          newIncludePrefixSet.add(prefix);
        }
      }
      newIncludePrefixList =
          RetentionUtil.consolidateDmPrefixes(new ArrayList<>(newIncludePrefixSet));
      total = newIncludePrefixList.size();
      start = maxCount + start;
      maxCount = Math.min(StsUtil.MAX_PREFIX_COUNT - total, dmRequests.size() - start);
      newIncludePrefixSet = new HashSet<>(newIncludePrefixList);
    }
    // update STS job
    TransferJob jobToUpdate =
        new TransferJob()
            .setDescription(
                StsRuleExecutor.buildDescription(
                    RetentionRuleType.USER.toString(), null, scheduleTimeOfDay))
            .setTransferSpec(
                StsUtil.buildTransferSpec(
                    bucket, destinationBucket, newIncludePrefixList, false, null))
            .setStatus(StsUtil.STS_ENABLED_STRING);
    try {
      transferJob =
          StsUtil.updateExistingJob(client, jobToUpdate, transferJob.getName(), projectId);
    } catch (IOException e) {
      // Update STS job failed. Fail the process immediately.
      logger.error("Failed to update STS job.", e);
      return false;
    }

    RetentionRule retentionRule = new RetentionRule();
    retentionRule.setProjectId(projectId);
    retentionRule.setDataStorageName(ValidationConstants.STORAGE_PREFIX + bucket);
    retentionRule.setType(RetentionRuleType.USER);

    RetentionJob retentionJob =
        StsRuleExecutor.buildRetentionJobEntity(
            transferJob.getName(),
            retentionRule,
            StsUtil.convertPrefixToString(newIncludePrefixList),
            new Timestamp(Instant.parse(transferJob.getLastModificationTime()).toEpochMilli()));
    retentionJob.setBatchId(getUuid());

    // update retention_job and dm_queue tables
    List<DmRequest> processedDmRequests = dmRequests.subList(0, start);
    dmRequests.stream()
        .forEach(
            request -> {
              if (request.getStatus().equals(DatabaseConstants.DM_REQUEST_STATIUS_RETRY)) {
                request.setNumberOfRetry(request.getNumberOfRetry() + 1);
                request.setPriority(
                    RetentionUtil.generatePriority(
                        request.getNumberOfRetry(),
                        request.getCreatedAt().toInstant().toEpochMilli()));
              }
              request.setStatus(DatabaseConstants.DM_REQUEST_STATUS_SCHEDULED);
            });

    try {
      dmQueueDao.createRetentionJobUdpateDmStatus(retentionJob, processedDmRequests);
    } catch (IOException e) {
      // Database transaction failed. However the process is still considered success as STS job has
      // been updated successfully.
      logger.error("Failed to create retention job and update DM request status.");
    }

    return true;
  }