static StoreActions getStoreActions()

in samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java [248:475]


  static StoreActions getStoreActions(
      TaskModel taskModel,
      Map<String, StorageEngine> storeEngines,
      Map<String, SystemStream> storeChangelogs,
      Map<String, KafkaStateCheckpointMarker> kafkaStateCheckpointMarkers,
      CheckpointId checkpointId,
      Map<SystemStreamPartition, SystemStreamPartitionMetadata> currentChangelogOffsets,
      SystemAdmins systemAdmins,
      StorageManagerUtil storageManagerUtil,
      File loggedStoreBaseDirectory,
      File nonLoggedStoreBaseDirectory,
      Config config,
      Clock clock) {
    TaskName taskName = taskModel.getTaskName();
    TaskMode taskMode = taskModel.getTaskMode();

    Map<String, File> storeDirToRetain = new HashMap<>();
    ListMultimap<String, File> storeDirsToDelete = ArrayListMultimap.create();
    Map<String, RestoreOffsets> storesToRestore = new HashMap<>();

    storeEngines.forEach((storeName, storageEngine) -> {
      // do nothing if store is non persistent and not logged (e.g. in memory cache only)
      if (!storageEngine.getStoreProperties().isPersistedToDisk() &&
        !storageEngine.getStoreProperties().isLoggedStore()) {
        return;
      }

      // persistent but non-logged stores are always deleted
      if (storageEngine.getStoreProperties().isPersistedToDisk() &&
          !storageEngine.getStoreProperties().isLoggedStore()) {
        File currentDir = storageManagerUtil.getTaskStoreDir(
            nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
        LOG.info("Marking current directory: {} for store: {} in task: {} for deletion since it is not a logged store.",
            currentDir, storeName, taskName);
        storeDirsToDelete.put(storeName, currentDir);
        // persistent but non-logged stores should not have checkpoint dirs
        return;
      }

      // get the oldest and newest current changelog SSP offsets as well as the checkpointed changelog SSP offset
      SystemStream changelog = storeChangelogs.get(storeName);
      SystemStreamPartition changelogSSP = new SystemStreamPartition(changelog, taskModel.getChangelogPartition());
      SystemAdmin admin = systemAdmins.getSystemAdmin(changelogSSP.getSystem());
      SystemStreamPartitionMetadata changelogSSPMetadata = currentChangelogOffsets.get(changelogSSP);
      String oldestOffset = changelogSSPMetadata.getOldestOffset();
      String newestOffset = changelogSSPMetadata.getNewestOffset();

      String checkpointedOffset = null; // can be null if no message, or message has null offset
      if (kafkaStateCheckpointMarkers.containsKey(storeName) &&
          StringUtils.isNotBlank(kafkaStateCheckpointMarkers.get(storeName).getChangelogOffset())) {
        checkpointedOffset = kafkaStateCheckpointMarkers.get(storeName).getChangelogOffset();
      }
      long timeSinceLastCheckpointInMs = checkpointId == null ? Long.MAX_VALUE : System.currentTimeMillis() - checkpointId.getMillis();

      // if the clean.store.start config is set, delete current and checkpoint dirs, restore from oldest offset to checkpointed
      if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(
        config).cleanLoggedStoreDirsOnStart(storeName)) {
        File currentDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskName, taskMode);
        LOG.info("Marking current directory: {} for store: {} in task: {} for deletion due to clean.on.container.start config.",
            currentDir, storeName, taskName);
        storeDirsToDelete.put(storeName, currentDir);

        storageManagerUtil.getTaskStoreCheckpointDirs(loggedStoreBaseDirectory, storeName, taskName, taskMode)
            .forEach(checkpointDir -> {
              LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion due to clean.on.container.start config.",
                  checkpointDir, storeName, taskName);
              storeDirsToDelete.put(storeName, checkpointDir);
            });

        LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", storeName, taskName, oldestOffset, checkpointedOffset);
        storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
        return;
      }

      Optional<File> currentDirOptional;
      Optional<List<File>> checkpointDirsOptional;

      if (!storageEngine.getStoreProperties().isPersistedToDisk()) {
        currentDirOptional = Optional.empty();
        checkpointDirsOptional = Optional.empty();
      } else {
        currentDirOptional = Optional.of(storageManagerUtil.getTaskStoreDir(
            loggedStoreBaseDirectory, storeName, taskName, taskMode));
        checkpointDirsOptional = Optional.of(storageManagerUtil.getTaskStoreCheckpointDirs(
            loggedStoreBaseDirectory, storeName, taskName, taskMode));
      }

      LOG.info("For store: {} in task: {} got current dir: {}, checkpoint dirs: {}, checkpointed changelog offset: {}",
          storeName, taskName, currentDirOptional, checkpointDirsOptional, checkpointedOffset);

      currentDirOptional.ifPresent(currentDir -> {
        LOG.info("Marking current directory: {} for store: {} in task: {} for deletion.",
            currentDir, storeName, taskName);
        storeDirsToDelete.put(storeName, currentDir);
      });

      if (checkpointedOffset == null && oldestOffset != null) {
        // this can mean that either this is the initial migration for this feature and there are no previously
        // checkpointed changelog offsets, or that this is a new store or changelog topic after the initial migration.

        // if this is the first time migration, it might be desirable to retain existing data.
        // if this is new store or topic, it is possible that the container previously died after writing some data to
        // the changelog but before a commit, so it is desirable to delete the store, not restore anything and
        // trim the changelog

        // since we can't tell the difference b/w the two scenarios by just looking at the store and changelogs,
        // we'll request users to indicate whether to retain existing data using a config flag. this flag should only
        // be set during migrations, and turned off after the first successful commit of the new container (i.e. next
        // deploy). for simplicity, we'll always delete the local store, and restore from changelog if necessary.

        // the former scenario should not be common. the recommended way to opt-in to the transactional state feature
        // is to first upgrade to the latest samza version but keep the transactional state restore config off.
        // this will create the store checkpoint directories and write the changelog offset to the checkpoint, but
        // will not use them during restore. once this is done (i.e. at least one commit after upgrade), the
        // transactional state restore feature can be turned on on subsequent deploys. this code path exists as a
        // fail-safe against clearing changelogs in case users do not follow upgrade instructions and enable the
        // feature directly.
        checkpointDirsOptional.ifPresent(checkpointDirs ->
            checkpointDirs.forEach(checkpointDir -> {
              LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion since checkpointed " +
                      "offset is null and oldest offset: {} is not.",
                  checkpointDir, storeName, taskName, oldestOffset);
              storeDirsToDelete.put(storeName, checkpointDir);
            }));

        if (new TaskConfig(config).getTransactionalStateRetainExistingState()) {
          // mark for restore from (oldest, newest) to recreate local state.
          LOG.warn("Checkpointed offset for store: {} in task: {} is null. Since retain existing state is true, " +
              "local state will be fully restored from current changelog contents. " +
              "There is no transactional local state guarantee.", storeName, taskName);
          storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, newestOffset));
        } else {
          LOG.warn("Checkpointed offset for store: {} in task: {} is null. Since retain existing state is false, " +
              "any local state and changelog topic contents will be deleted.", storeName, taskName);
          // mark for restore from (oldest, null) to trim entire changelog.
          storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, null));
        }
      } else if (// check if the checkpointed offset is out of range of current oldest and newest offsets
          admin.offsetComparator(oldestOffset, checkpointedOffset) > 0 ||
          admin.offsetComparator(checkpointedOffset, newestOffset) > 0) {
        // checkpointed offset is out of range. this could mean that this is a TTL topic and the checkpointed
        // offset was TTLd, or that the changelog topic was manually deleted and then recreated.
        // we cannot guarantee transactional state for TTL stores, so delete everything and do a full restore
        // for local store. if the topic was deleted and recreated, this will have the side effect of
        // clearing the store as well.
        LOG.warn("Checkpointed offset: {} for store: {} in task: {} is out of range of oldest: {} or newest: {} offset." +
                "Deleting existing store and fully restoring from changelog topic from oldest to newest offset. If the topic " +
                "has time-based retention, there is no transactional local state guarantees. If the topic was changed," +
                "local state will be cleaned up and fully restored to match the new topic contents.",
            checkpointedOffset, storeName, taskName, oldestOffset, newestOffset);
        checkpointDirsOptional.ifPresent(checkpointDirs ->
            checkpointDirs.forEach(checkpointDir -> storeDirsToDelete.put(storeName, checkpointDir)));
        storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, newestOffset));
      } else { // happy path. checkpointed offset is in range of current oldest and newest offsets
        if (!checkpointDirsOptional.isPresent()) { // non-persistent logged store
          LOG.info("Did not find any checkpoint directories for logged (maybe non-persistent) store: {}. Local state " +
              "will be fully restored from current changelog contents.", storeName);
          storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
        } else { // persistent logged store
          String targetOffset;

          // check checkpoint time against min.compaction.lag.ms. if older, restore from checkpointed offset to newest
          // with no trim. be conservative. allow 10% safety margin to avoid deletions when the downtime is close
          // to min.compaction.lag.ms
          long minCompactionLagMs = new StorageConfig(config).getChangelogMinCompactionLagMs(storeName);
          if (timeSinceLastCheckpointInMs > .9 * minCompactionLagMs) {
            LOG.warn("Checkpointed offset for store: {} in task: {} is: {}. It is in range of oldest: {} and " +
                "newest: {} changelog offset. However, time since last checkpoint is: {}, which is greater than " +
                "0.9 * min.compaction.lag.ms: {} for the changelog topic. Since there is a chance that" +
                "the changelog topic has been compacted, restoring store to the end of the current changelog contents." +
                "There is no transactional local state guarantee.", storeName, taskName, checkpointedOffset,
                oldestOffset, newestOffset, timeSinceLastCheckpointInMs, minCompactionLagMs);
            targetOffset = newestOffset;
          } else {
            targetOffset = checkpointedOffset;
          }

          // if there exists a valid store checkpoint directory with oldest offset <= local offset <= target offset,
          // retain it and restore the delta. delete all other checkpoint directories for the store. if more than one such
          // checkpoint directory exists, retain the one with the highest local offset and delete the rest.
          boolean hasValidCheckpointDir = false;
          for (File checkpointDir: checkpointDirsOptional.get()) {
            if (storageManagerUtil.isLoggedStoreValid(
                storeName, checkpointDir, config, storeChangelogs, taskModel, clock, storeEngines)) {
              String localOffset = storageManagerUtil.readOffsetFile(
                  checkpointDir, Collections.singleton(changelogSSP), false).get(changelogSSP);
              LOG.info("Read local offset: {} for store: {} checkpoint dir: {} in task: {}", localOffset, storeName,
                  checkpointDir, taskName);

              if (admin.offsetComparator(localOffset, oldestOffset) >= 0 &&
                  admin.offsetComparator(localOffset, targetOffset) <= 0 &&
                  (storesToRestore.get(storeName) == null ||
                      admin.offsetComparator(localOffset, storesToRestore.get(storeName).startingOffset) > 0)) {
                hasValidCheckpointDir = true;
                LOG.info("Temporarily marking checkpoint dir: {} for store: {} in task: {} for retention. " +
                    "May be overridden later.", checkpointDir, storeName, taskName);
                storeDirToRetain.put(storeName, checkpointDir);
                // mark for restore even if local == checkpointed, so that the changelog gets trimmed.
                LOG.info("Temporarily marking store: {} in task: {} for restore from beginning offset: {} to " +
                    "ending offset: {}. May be overridden later", storeName, taskName, localOffset, targetOffset);
                storesToRestore.put(storeName, new RestoreOffsets(localOffset, targetOffset));
              }
            }
          }

          // delete all non-retained checkpoint directories
          for (File checkpointDir: checkpointDirsOptional.get()) {
            if (storeDirToRetain.get(storeName) == null ||
                !storeDirToRetain.get(storeName).equals(checkpointDir)) {
              LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion since it is not " +
                  "marked for retention.", checkpointDir, storeName, taskName);
              storeDirsToDelete.put(storeName, checkpointDir);
            }
          }

          // if the store had not valid checkpoint dirs to retain, restore from changelog
          if (!hasValidCheckpointDir) {
            storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, targetOffset));
          }
        }
      }
    });

    LOG.info("Store directories to be retained in Task: {} are: {}", taskName, storeDirToRetain);
    LOG.info("Store directories to be deleted in Task: {} are: {}", taskName, storeDirsToDelete);
    LOG.info("Stores to be restored in Task: {} are: {}", taskName, storesToRestore);
    return new StoreActions(storeDirToRetain, storeDirsToDelete, storesToRestore);
  }