def run()

in samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala [155:228]


  def run() {
    val configFromCoordinatorStream: Config = getConfigFromCoordinatorStream(coordinatorStreamStore)

    println("Configuration read from the coordinator stream")
    println(configFromCoordinatorStream)

    val combinedConfigMap: util.Map[String, String] = new util.HashMap[String, String]()
    combinedConfigMap.putAll(configFromCoordinatorStream)
    combinedConfigMap.putAll(userDefinedConfig)
    val combinedConfig: Config = new MapConfig(combinedConfigMap)

    val taskConfig = new TaskConfig(combinedConfig)
    // Instantiate the checkpoint manager with coordinator stream configuration.
    val checkpointManager: CheckpointManager =
      JavaOptionals.toRichOptional(taskConfig.getCheckpointManager(new MetricsRegistryMap))
        .toOption
        .getOrElse(throw new SamzaException("Configuration: task.checkpoint.factory is not defined."))
    try {
      // Find all the TaskNames that would be generated for this job config
      val changelogManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
      val jobModelManager = JobModelManager(combinedConfig, changelogManager.readPartitionMapping(),
        coordinatorStreamStore, new MetricsRegistryMap())
      val taskNames = jobModelManager
        .jobModel
        .getContainers
        .values
        .asScala
        .flatMap(_.getTasks.asScala.keys)
        .toSet

      taskNames.foreach(checkpointManager.register)
      checkpointManager.start()

      // Get preferred read version for the checkpoint application
      val checkpointReadVersion = taskConfig.getCheckpointReadVersions.get(0)
      val defaultCheckpoint = if (checkpointReadVersion == 1) {
        new CheckpointV1(new java.util.HashMap[SystemStreamPartition, String]())
      } else if (checkpointReadVersion == 2) {
        new CheckpointV2(CheckpointId.create(), new java.util.HashMap[SystemStreamPartition, String](),
          new java.util.HashMap[String, util.Map[String, String]]())
      } else {
        throw new SamzaException("Unrecognized checkpoint read version: " + checkpointReadVersion)
      }

      val lastCheckpoints = taskNames.map(taskName => {
        taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
          .getOrElse(defaultCheckpoint)
      }).toMap

      lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2.getOffsets.asScala.toMap,
        "Current checkpoint for task: " + lcp._1))

      if (newOffsets != null) {
        newOffsets.foreach {
          case (taskName: TaskName, offsets: Map[SystemStreamPartition, String]) =>
            logCheckpoint(taskName, offsets, "New offset to be written for task: " + taskName)
            val checkpoint = if (checkpointReadVersion == 1) {
              new CheckpointV1(offsets.asJava)
            } else if (checkpointReadVersion == 2) {
              val lastSCMs = lastCheckpoints.getOrElse(taskName, defaultCheckpoint)
                .asInstanceOf[CheckpointV2].getStateCheckpointMarkers
              new CheckpointV2(CheckpointId.create(), offsets.asJava, lastSCMs)
            } else {
              throw new SamzaException("Unrecognized checkpoint read version: " + checkpointReadVersion)
            }
            checkpointManager.writeCheckpoint(taskName, checkpoint)
            info(s"Updated the checkpoint of the task: $taskName to: $offsets")
        }
      }
    } finally {
      checkpointManager.stop()
      coordinatorStreamStore.close()
   }
  }