in gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java [261:363]
private LeaseAttemptStatus doTryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException {
// Query lease arbiter table about this dag action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(leaseParams);
try {
if (!getResult.isPresent()) {
log.debug("tryAcquireLease for {} - CASE 1: DagAction has no row yet - create now", contextualizeLeasing(leaseParams));
int numRowsUpdated = attemptLeaseIfNewRow(leaseParams.getDagAction(),
ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
.initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) (Math.random() * DELAY_FOR_RETRY_RANGE_MILLIS))
.build());
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, Optional.empty(),
adoptConsensusFlowExecutionId);
}
// Extract values from result set
Timestamp dbEventTimestamp = getResult.get().getDbEventTimestamp();
Timestamp dbLeaseAcquisitionTimestamp = getResult.get().getDbLeaseAcquisitionTimestamp();
boolean isWithinEpsilon = getResult.get().isWithinEpsilon();
int leaseValidityStatus = getResult.get().getLeaseValidityStatus();
// Used to calculate minimum amount of time until a participant should check whether a lease expired
int dbLinger = getResult.get().getDbLinger();
Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
// stop early with reminder events when the reminder's eventTimeMillis is older than that of the current DB event; DB laundering should
// guarantee that the current DB event is truly a distinct newer event (vs. clock drift) and thus should have separate reminders of its own
if (leaseParams.isReminder()) {
if (leaseParams.getEventTimeMillis() < dbEventTimestamp.getTime()) {
log.info("tryAcquireLease for {} (dbEventTimeMillis: {}) - Newer DB time, so discarding out-of-date reminder",
contextualizeLeasing(leaseParams), dbEventTimestamp);
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
if (leaseParams.getEventTimeMillis() > dbEventTimestamp.getTime()) {
// TODO: emit metric here to capture this unexpected behavior
log.warn("tryAcquireLease for {} (dbEventTimeMillis: {}) - Severe constraint violation: DB time OLDER than reminder event, when DB laundering "
+ "ought to ensure monotonically increasing (laundered) event times.", contextualizeLeasing(leaseParams), dbEventTimestamp.getTime());
}
if (leaseParams.getEventTimeMillis() == dbEventTimestamp.getTime()) {
log.debug("tryAcquireLease for {} (dbEventTimeMillis: {}) - DB time matches reminder", contextualizeLeasing(leaseParams), dbEventTimestamp);
}
}
// TODO: check whether reminder event before replacing flowExecutionId
if (adoptConsensusFlowExecutionId) {
log.info("Multi-active will use DB time ({}) to launder {}", dbCurrentTimestamp.getTime(), contextualizeLeasing(leaseParams));
}
/* Note that we use `adoptConsensusFlowExecutionId` parameter's value to determine whether we should use the db
laundered event timestamp as the flowExecutionId or maintain the original one
*/
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ? leaseParams.updateDagActionFlowExecutionId(dbEventTimestamp.getTime()) : leaseParams.getDagAction();
DagActionStore.LeaseParams updatedLeaseParams = new DagActionStore.LeaseParams(updatedDagAction,
dbEventTimestamp.getTime());
log.debug("tryAcquireLease for {} - CASE 2: Same event, lease is valid", contextualizeLeasing(updatedLeaseParams));
// Utilize db timestamp for reminder
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ? leaseParams.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime()) : leaseParams.getDagAction();
DagActionStore.LeaseParams updatedLeaseParams = new DagActionStore.LeaseParams(updatedDagAction,
dbCurrentTimestamp.getTime());
log.debug("tryAcquireLease for {} - CASE 3: Distinct event, lease is valid", contextualizeLeasing(updatedLeaseParams));
// Utilize db lease acquisition timestamp for wait time and currentTimestamp as the new eventTimestamp
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
log.debug("tryAcquireLease for {} (DB current time: {}) - CASE 4: Lease out-of-date (regardless of "
+ "whether same or distinct event)", contextualizeLeasing(leaseParams), dbCurrentTimestamp.getTime());
if (isWithinEpsilon && !leaseParams.isReminder) {
log.warn("Lease should not be out-of-date for the same trigger event, if epsilon << linger for {} "
+ "(DB eventTimestamp: {}; DB leaseAcquisitionTimestamp: {}; DB linger: {})",
contextualizeLeasing(leaseParams), dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
}
// Use our event to acquire lease, check for previous DB eventTimestamp and leaseAcquisitionTimestamp
int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
leaseParams.getDagAction(), true,true, dbEventTimestamp,
dbLeaseAcquisitionTimestamp);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, Optional.of(dbCurrentTimestamp),
adoptConsensusFlowExecutionId);
} // No longer leasing this event
if (isWithinEpsilon) {
log.debug("tryAcquireLease for {} (DB current time: {}) - CASE 5: Same event, no longer leasing event in DB",
contextualizeLeasing(leaseParams), dbCurrentTimestamp.getTime());
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
log.debug("tryAcquireLease for {} (DB current time: {}) - CASE 6: Distinct event, no longer leasing event in DB",
contextualizeLeasing(leaseParams), dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous DB eventTimestamp and NULL leaseAcquisitionTimestamp
int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement,
leaseParams.getDagAction(), true, false, dbEventTimestamp,
null);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, Optional.of(dbCurrentTimestamp),
adoptConsensusFlowExecutionId);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}