private LeaseAttemptStatus doTryAcquireLease()

in gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java [261:363]


  private LeaseAttemptStatus doTryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException {
    // Query lease arbiter table about this dag action
    Optional<GetEventInfoResult> getResult = getExistingEventInfo(leaseParams);

    try {
      if (!getResult.isPresent()) {
        log.debug("tryAcquireLease for {} - CASE 1: DagAction has no row yet - create now", contextualizeLeasing(leaseParams));
        int numRowsUpdated = attemptLeaseIfNewRow(leaseParams.getDagAction(),
            ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
                .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) (Math.random() * DELAY_FOR_RETRY_RANGE_MILLIS))
                .build());
       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, Optional.empty(),
           adoptConsensusFlowExecutionId);
      }

      // Extract values from result set
      Timestamp dbEventTimestamp = getResult.get().getDbEventTimestamp();
      Timestamp dbLeaseAcquisitionTimestamp = getResult.get().getDbLeaseAcquisitionTimestamp();
      boolean isWithinEpsilon = getResult.get().isWithinEpsilon();
      int leaseValidityStatus = getResult.get().getLeaseValidityStatus();
      // Used to calculate minimum amount of time until a participant should check whether a lease expired
      int dbLinger = getResult.get().getDbLinger();
      Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();

      // stop early with reminder events when the reminder's eventTimeMillis is older than that of the current DB event; DB laundering should
      // guarantee that the current DB event is truly a distinct newer event (vs. clock drift) and thus should have separate reminders of its own
      if (leaseParams.isReminder()) {
        if (leaseParams.getEventTimeMillis() < dbEventTimestamp.getTime()) {
          log.info("tryAcquireLease for {} (dbEventTimeMillis: {}) - Newer DB time, so discarding out-of-date reminder",
              contextualizeLeasing(leaseParams), dbEventTimestamp);
          return new LeaseAttemptStatus.NoLongerLeasingStatus();
        }
        if (leaseParams.getEventTimeMillis() > dbEventTimestamp.getTime()) {
          // TODO: emit metric here to capture this unexpected behavior
          log.warn("tryAcquireLease for {} (dbEventTimeMillis: {}) - Severe constraint violation: DB time OLDER than reminder event, when DB laundering "
              + "ought to ensure monotonically increasing (laundered) event times.", contextualizeLeasing(leaseParams), dbEventTimestamp.getTime());
        }
        if (leaseParams.getEventTimeMillis() == dbEventTimestamp.getTime()) {
          log.debug("tryAcquireLease for {} (dbEventTimeMillis: {}) - DB time matches reminder", contextualizeLeasing(leaseParams), dbEventTimestamp);
        }
      }

      // TODO: check whether reminder event before replacing flowExecutionId
      if (adoptConsensusFlowExecutionId) {
        log.info("Multi-active will use DB time ({}) to launder {}", dbCurrentTimestamp.getTime(), contextualizeLeasing(leaseParams));
      }
      /* Note that we use `adoptConsensusFlowExecutionId` parameter's value to determine whether we should use the db
      laundered event timestamp as the flowExecutionId or maintain the original one
       */

      // Lease is valid
      if (leaseValidityStatus == 1) {
        if (isWithinEpsilon) {
         DagActionStore.DagAction updatedDagAction =
              adoptConsensusFlowExecutionId ? leaseParams.updateDagActionFlowExecutionId(dbEventTimestamp.getTime()) : leaseParams.getDagAction();
         DagActionStore.LeaseParams updatedLeaseParams = new DagActionStore.LeaseParams(updatedDagAction,
             dbEventTimestamp.getTime());
          log.debug("tryAcquireLease for {} - CASE 2: Same event, lease is valid", contextualizeLeasing(updatedLeaseParams));
          // Utilize db timestamp for reminder
          return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
              dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
        }
        DagActionStore.DagAction updatedDagAction =
            adoptConsensusFlowExecutionId ? leaseParams.getDagAction().updateFlowExecutionId(dbCurrentTimestamp.getTime()) : leaseParams.getDagAction();
        DagActionStore.LeaseParams updatedLeaseParams = new DagActionStore.LeaseParams(updatedDagAction,
            dbCurrentTimestamp.getTime());
        log.debug("tryAcquireLease for {} - CASE 3: Distinct event, lease is valid", contextualizeLeasing(updatedLeaseParams));
        // Utilize db lease acquisition timestamp for wait time and currentTimestamp as the new eventTimestamp
        return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedLeaseParams,
            dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - dbCurrentTimestamp.getTime());
      } // Lease is invalid
      else if (leaseValidityStatus == 2) {
        log.debug("tryAcquireLease for {} (DB current time: {}) - CASE 4: Lease out-of-date (regardless of "
            + "whether same or distinct event)", contextualizeLeasing(leaseParams), dbCurrentTimestamp.getTime());
        if (isWithinEpsilon && !leaseParams.isReminder) {
          log.warn("Lease should not be out-of-date for the same trigger event, if epsilon << linger for {} "
                  + "(DB eventTimestamp: {}; DB leaseAcquisitionTimestamp: {}; DB linger: {})",
              contextualizeLeasing(leaseParams), dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
        }
        // Use our event to acquire lease, check for previous DB eventTimestamp and leaseAcquisitionTimestamp
        int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement,
            leaseParams.getDagAction(), true,true, dbEventTimestamp,
            dbLeaseAcquisitionTimestamp);
        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, Optional.of(dbCurrentTimestamp),
            adoptConsensusFlowExecutionId);
      } // No longer leasing this event
        if (isWithinEpsilon) {
          log.debug("tryAcquireLease for {} (DB current time: {}) - CASE 5: Same event, no longer leasing event in DB",
              contextualizeLeasing(leaseParams), dbCurrentTimestamp.getTime());
          return new LeaseAttemptStatus.NoLongerLeasingStatus();
        }
        log.debug("tryAcquireLease for {} (DB current time: {}) - CASE 6: Distinct event, no longer leasing event in DB",
            contextualizeLeasing(leaseParams), dbCurrentTimestamp.getTime());
        // Use our event to acquire lease, check for previous DB eventTimestamp and NULL leaseAcquisitionTimestamp
        int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement,
            leaseParams.getDagAction(), true, false, dbEventTimestamp,
            null);
        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, leaseParams, Optional.of(dbCurrentTimestamp),
            adoptConsensusFlowExecutionId);
    } catch (SQLException e) {
      throw new RuntimeException(e);
    }
  }