public List executeDatasetRule()

in src/main/java/com/google/gcs/sdrs/service/worker/rule/impl/StsRuleExecutor.java [180:301]


  public List<RetentionJob> executeDatasetRule(
      Collection<RetentionRule> datasetRules, String projectId) {
    List<RetentionJob> datasetRuleJobs = new ArrayList<>();
    // get all dataset rules for a bucket
    Map<String, List<RetentionRule>> bucketDatasetMap = buildBucketRuleMap(datasetRules);
    String correlationId = getCorrelationId();
    ZonedDateTime zonedDateTimeNow = ZonedDateTime.now(Clock.systemUTC());
    String scheduleTimeOfDay = zonedDateTimeNow.format(DateTimeFormatter.ofPattern("HH:mm:ss"));

    for (String bucketName : bucketDatasetMap.keySet()) {

      List<String> prefixes = new ArrayList<>();
      Map<String, List<String>> prefixesPerDatasetMap = new HashMap<>();

      // create prefixes from all dataset rules for a bucket
      for (RetentionRule datasetRule : bucketDatasetMap.get(bucketName)) {
        if (datasetRule.getType() != RetentionRuleType.DATASET) {
          logger.warn("Rule type is not dataset.");
          continue;
        }

        RetentionValue retentionValue = RetentionValue.parse(datasetRule.getRetentionValue());
        String datasetPath = RetentionUtil.getDatasetPath(datasetRule.getDataStorageName());
        List<String> tmpPrefixes = new ArrayList<>();

        try {
          if (retentionValue.getUnitType() == RetentionUnitType.VERSION) {
            String prefix = RetentionUtil.generateValidPrefixForListingObjects(datasetPath);
            List<String> objectsPath = GcsHelper.getInstance().listObjectsWithPrefixInBucket(
                bucketName, prefix);
            tmpPrefixes = PrefixGeneratorUtility.generateVersionPrefix(objectsPath,
                retentionValue.getNumber());
          } else {
            tmpPrefixes = PrefixGeneratorUtility.generateTimePrefixes(datasetPath,
                zonedDateTimeNow.minusDays(StsUtil.STS_LOOKBACK_DAYS),
                zonedDateTimeNow.minusDays(
                    RetentionValue.convertValue(retentionValue)));
          }
        } catch (IllegalArgumentException e) {
          logger.error(
              String.format(
                  "Failed to generate prefix for dataset %s. %s", datasetPath, e.getMessage()), e);
        }
        prefixesPerDatasetMap.put(datasetRule.getDataStorageName(), tmpPrefixes);
        prefixes.addAll(tmpPrefixes);
      }
      if (!prefixes.isEmpty()) {
        sendInactiveDatasetNotification(
            projectId, bucketName, prefixes, zonedDateTimeNow.toInstant(), correlationId);
      }

      String sourceBucket = bucketName;
      String destinationBucket = StsUtil.buildDestinationBucketName(bucketName);
      String description =
          buildDescription(
              RetentionRuleType.DATASET.toString(),
              bucketDatasetMap.get(bucketName),
              scheduleTimeOfDay);

      logger.info(
          String.format(
              "Scheduling dataset STS job with projectId: %s, "
                  + "description: %s, source: %s, destination: %s",
              projectId, description, sourceBucket, destinationBucket));

      TransferJob job = null;
      try {
        if (prefixes.size() != 0) {
          TransferJob stsPooledJob =
              findPooledJob(projectId, bucketName, scheduleTimeOfDay, RetentionRuleType.DATASET);
          if (stsPooledJob == null) {
            if (!StsUtil.IS_STS_JOBPOOL_ONLY) {
              job =
                  StsUtil.createStsJob(
                      client,
                      projectId,
                      sourceBucket,
                      destinationBucket,
                      prefixes,
                      description,
                      zonedDateTimeNow);
            }
          } else {
            TransferJob jobToUpdate = new TransferJob();
            jobToUpdate
                .setDescription(description)
                .setTransferSpec(
                    StsUtil.buildTransferSpec(sourceBucket, destinationBucket, prefixes, false, null))
                .setStatus(StsUtil.STS_ENABLED_STRING);
            job = StsUtil.updateExistingJob(client, jobToUpdate, stsPooledJob.getName(), projectId);
          }
        } else {
          logger.error(String.format("There is not prefix generated for bucket %s", bucketName));
        }
      } catch (IOException e) {
        logger.error(
            String.format(
                "Failed to schedule dataset STS job for %s/%s. %s",
                projectId, sourceBucket, e.getMessage()),
            e);
      }

      String jobName = null;
      Timestamp createdAt = null;
      if (job != null) {
        jobName = job.getName();
        createdAt = new Timestamp(Instant.parse(job.getLastModificationTime()).toEpochMilli());
      }

      for (RetentionRule datasetRule : bucketDatasetMap.get(bucketName)) {
        datasetRuleJobs.add(
            buildRetentionJobEntity(
                jobName,
                datasetRule,
                StsUtil.convertPrefixToString(
                    prefixesPerDatasetMap.get(datasetRule.getDataStorageName())),
                createdAt));
      }
    }

    return datasetRuleJobs;
  }