in src/main/java/com/google/gcs/sdrs/service/worker/rule/impl/StsRuleExecutor.java [180:301]
public List<RetentionJob> executeDatasetRule(
Collection<RetentionRule> datasetRules, String projectId) {
List<RetentionJob> datasetRuleJobs = new ArrayList<>();
// get all dataset rules for a bucket
Map<String, List<RetentionRule>> bucketDatasetMap = buildBucketRuleMap(datasetRules);
String correlationId = getCorrelationId();
ZonedDateTime zonedDateTimeNow = ZonedDateTime.now(Clock.systemUTC());
String scheduleTimeOfDay = zonedDateTimeNow.format(DateTimeFormatter.ofPattern("HH:mm:ss"));
for (String bucketName : bucketDatasetMap.keySet()) {
List<String> prefixes = new ArrayList<>();
Map<String, List<String>> prefixesPerDatasetMap = new HashMap<>();
// create prefixes from all dataset rules for a bucket
for (RetentionRule datasetRule : bucketDatasetMap.get(bucketName)) {
if (datasetRule.getType() != RetentionRuleType.DATASET) {
logger.warn("Rule type is not dataset.");
continue;
}
RetentionValue retentionValue = RetentionValue.parse(datasetRule.getRetentionValue());
String datasetPath = RetentionUtil.getDatasetPath(datasetRule.getDataStorageName());
List<String> tmpPrefixes = new ArrayList<>();
try {
if (retentionValue.getUnitType() == RetentionUnitType.VERSION) {
String prefix = RetentionUtil.generateValidPrefixForListingObjects(datasetPath);
List<String> objectsPath = GcsHelper.getInstance().listObjectsWithPrefixInBucket(
bucketName, prefix);
tmpPrefixes = PrefixGeneratorUtility.generateVersionPrefix(objectsPath,
retentionValue.getNumber());
} else {
tmpPrefixes = PrefixGeneratorUtility.generateTimePrefixes(datasetPath,
zonedDateTimeNow.minusDays(StsUtil.STS_LOOKBACK_DAYS),
zonedDateTimeNow.minusDays(
RetentionValue.convertValue(retentionValue)));
}
} catch (IllegalArgumentException e) {
logger.error(
String.format(
"Failed to generate prefix for dataset %s. %s", datasetPath, e.getMessage()), e);
}
prefixesPerDatasetMap.put(datasetRule.getDataStorageName(), tmpPrefixes);
prefixes.addAll(tmpPrefixes);
}
if (!prefixes.isEmpty()) {
sendInactiveDatasetNotification(
projectId, bucketName, prefixes, zonedDateTimeNow.toInstant(), correlationId);
}
String sourceBucket = bucketName;
String destinationBucket = StsUtil.buildDestinationBucketName(bucketName);
String description =
buildDescription(
RetentionRuleType.DATASET.toString(),
bucketDatasetMap.get(bucketName),
scheduleTimeOfDay);
logger.info(
String.format(
"Scheduling dataset STS job with projectId: %s, "
+ "description: %s, source: %s, destination: %s",
projectId, description, sourceBucket, destinationBucket));
TransferJob job = null;
try {
if (prefixes.size() != 0) {
TransferJob stsPooledJob =
findPooledJob(projectId, bucketName, scheduleTimeOfDay, RetentionRuleType.DATASET);
if (stsPooledJob == null) {
if (!StsUtil.IS_STS_JOBPOOL_ONLY) {
job =
StsUtil.createStsJob(
client,
projectId,
sourceBucket,
destinationBucket,
prefixes,
description,
zonedDateTimeNow);
}
} else {
TransferJob jobToUpdate = new TransferJob();
jobToUpdate
.setDescription(description)
.setTransferSpec(
StsUtil.buildTransferSpec(sourceBucket, destinationBucket, prefixes, false, null))
.setStatus(StsUtil.STS_ENABLED_STRING);
job = StsUtil.updateExistingJob(client, jobToUpdate, stsPooledJob.getName(), projectId);
}
} else {
logger.error(String.format("There is not prefix generated for bucket %s", bucketName));
}
} catch (IOException e) {
logger.error(
String.format(
"Failed to schedule dataset STS job for %s/%s. %s",
projectId, sourceBucket, e.getMessage()),
e);
}
String jobName = null;
Timestamp createdAt = null;
if (job != null) {
jobName = job.getName();
createdAt = new Timestamp(Instant.parse(job.getLastModificationTime()).toEpochMilli());
}
for (RetentionRule datasetRule : bucketDatasetMap.get(bucketName)) {
datasetRuleJobs.add(
buildRetentionJobEntity(
jobName,
datasetRule,
StsUtil.convertPrefixToString(
prefixesPerDatasetMap.get(datasetRule.getDataStorageName())),
createdAt));
}
}
return datasetRuleJobs;
}