in src/main/java/com/google/gcs/sdrs/service/worker/impl/DmBatchProcessingWorker.java [134:266]
private boolean processDmRequestByBucket(String bucket, List<DmRequest> dmRequests) {
String destinationBucket = StsUtil.buildDestinationBucketName(bucket);
ZonedDateTime zonedDateTimeNow = ZonedDateTime.now(Clock.systemUTC());
String scheduleTimeOfDay = zonedDateTimeNow.format(DateTimeFormatter.ofPattern("HH:mm:ss"));
String projectId = dmRequests.get(0).getProjectId();
TransferJob transferJob = null;
try {
transferJob =
StsRuleExecutor.getInstance()
.findPooledJob(projectId, bucket, scheduleTimeOfDay, RetentionRuleType.USER);
} catch (IOException e) {
// Can't allocate the job from the pool. Fail immediately.
return false;
}
TimeOfDay jobRunAtTimeOfDay = transferJob.getSchedule().getStartTimeOfDay();
// derived from transferJob.tomeOfDay and now
ZonedDateTime lastRunTime =
ZonedDateTime.of(
zonedDateTimeNow.getYear(),
zonedDateTimeNow.getMonthValue(),
zonedDateTimeNow.getDayOfMonth(),
jobRunAtTimeOfDay.getHours() != null ? jobRunAtTimeOfDay.getHours() : 0,
jobRunAtTimeOfDay.getMinutes() != null ? jobRunAtTimeOfDay.getMinutes() : 0,
jobRunAtTimeOfDay.getSeconds() != null ? jobRunAtTimeOfDay.getSeconds() : 0,
jobRunAtTimeOfDay.getNanos() != null ? jobRunAtTimeOfDay.getNanos() : 0,
ZoneId.of("UTC"))
.minusHours(24);
ZonedDateTime lastModifiedTime = ZonedDateTime.parse(transferJob.getLastModificationTime());
List<String> existingIncludePrefixList = new ArrayList<>();
if (transferJob.getStatus().equals(StsUtil.STS_ENABLED_STRING)) {
ObjectConditions objectConditions = transferJob.getTransferSpec().getObjectConditions();
if (objectConditions != null && objectConditions.getIncludePrefixes() != null) {
existingIncludePrefixList = objectConditions.getIncludePrefixes();
}
}
int initPrefxiNumber = 0;
Set<String> newIncludePrefixSet = new HashSet<>();
// Replace the existing prefix list if last modified time is older than the last job run time,
// meaning the daily STS job has already run and the existing prefix list has been processed.
if (lastModifiedTime.isBefore(lastRunTime)) {
initPrefxiNumber = StsUtil.MAX_PREFIX_COUNT;
} else {
initPrefxiNumber = StsUtil.MAX_PREFIX_COUNT - existingIncludePrefixList.size();
newIncludePrefixSet.addAll(existingIncludePrefixList);
}
int maxCount = Math.min(initPrefxiNumber, dmRequests.size());
int total = 0;
int start = 0;
List<String> newIncludePrefixList = null;
while (total < StsUtil.MAX_PREFIX_COUNT && start < dmRequests.size()) {
for (int i = 0; i < maxCount; i++) {
String prefix =
RetentionUtil.getDatasetPath(dmRequests.get(i + start).getDataStorageName());
if (prefix != null && !prefix.isEmpty()) {
if (!prefix.endsWith("/")) {
prefix = prefix + "/";
}
newIncludePrefixSet.add(prefix);
}
}
newIncludePrefixList =
RetentionUtil.consolidateDmPrefixes(new ArrayList<>(newIncludePrefixSet));
total = newIncludePrefixList.size();
start = maxCount + start;
maxCount = Math.min(StsUtil.MAX_PREFIX_COUNT - total, dmRequests.size() - start);
newIncludePrefixSet = new HashSet<>(newIncludePrefixList);
}
// update STS job
TransferJob jobToUpdate =
new TransferJob()
.setDescription(
StsRuleExecutor.buildDescription(
RetentionRuleType.USER.toString(), null, scheduleTimeOfDay))
.setTransferSpec(
StsUtil.buildTransferSpec(
bucket, destinationBucket, newIncludePrefixList, false, null))
.setStatus(StsUtil.STS_ENABLED_STRING);
try {
transferJob =
StsUtil.updateExistingJob(client, jobToUpdate, transferJob.getName(), projectId);
} catch (IOException e) {
// Update STS job failed. Fail the process immediately.
logger.error("Failed to update STS job.", e);
return false;
}
RetentionRule retentionRule = new RetentionRule();
retentionRule.setProjectId(projectId);
retentionRule.setDataStorageName(ValidationConstants.STORAGE_PREFIX + bucket);
retentionRule.setType(RetentionRuleType.USER);
RetentionJob retentionJob =
StsRuleExecutor.buildRetentionJobEntity(
transferJob.getName(),
retentionRule,
StsUtil.convertPrefixToString(newIncludePrefixList),
new Timestamp(Instant.parse(transferJob.getLastModificationTime()).toEpochMilli()));
retentionJob.setBatchId(getUuid());
// update retention_job and dm_queue tables
List<DmRequest> processedDmRequests = dmRequests.subList(0, start);
dmRequests.stream()
.forEach(
request -> {
if (request.getStatus().equals(DatabaseConstants.DM_REQUEST_STATIUS_RETRY)) {
request.setNumberOfRetry(request.getNumberOfRetry() + 1);
request.setPriority(
RetentionUtil.generatePriority(
request.getNumberOfRetry(),
request.getCreatedAt().toInstant().toEpochMilli()));
}
request.setStatus(DatabaseConstants.DM_REQUEST_STATUS_SCHEDULED);
});
try {
dmQueueDao.createRetentionJobUdpateDmStatus(retentionJob, processedDmRequests);
} catch (IOException e) {
// Database transaction failed. However the process is still considered success as STS job has
// been updated successfully.
logger.error("Failed to create retention job and update DM request status.");
}
return true;
}