in spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java [111:158]
public void acquireLock(final ScheduledJobParameter jobParameter,
final JobExecutionContext context, ActionListener<LockModel> listener) {
final String jobIndexName = context.getJobIndexName();
final String jobId = context.getJobId();
if (jobParameter.getLockDurationSeconds() == null) {
listener.onFailure(new IllegalArgumentException("Job LockDuration should not be null"));
} else {
final long lockDurationSecond = jobParameter.getLockDurationSeconds();
createLockIndex(ActionListener.wrap(
created -> {
if (created) {
try {
findLock(LockModel.generateLockId(jobIndexName, jobId), ActionListener.wrap(
existingLock -> {
if (existingLock != null) {
if (isLockReleasedOrExpired(existingLock)) {
// Lock is expired. Attempt to acquire lock.
logger.debug("lock is released or expired: " + existingLock);
LockModel updateLock = new LockModel(existingLock, getNow(),
lockDurationSecond, false);
updateLock(updateLock, listener);
} else {
logger.debug("Lock is NOT released or expired. " + existingLock);
// Lock is still not expired. Return null as we cannot acquire lock.
listener.onResponse(null);
}
} else {
// There is no lock object and it is first time. Create new lock.
LockModel tempLock = new LockModel(jobIndexName, jobId, getNow(),
lockDurationSecond, false);
logger.debug("Lock does not exist. Creating new lock" + tempLock);
createLock(tempLock, listener);
}
},
listener::onFailure
));
} catch (VersionConflictEngineException e) {
logger.debug("could not acquire lock {}", e.getMessage());
listener.onResponse(null);
}
} else {
listener.onResponse(null);
}
},
listener::onFailure
));
}
}