in samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala [155:228]
def run() {
val configFromCoordinatorStream: Config = getConfigFromCoordinatorStream(coordinatorStreamStore)
println("Configuration read from the coordinator stream")
println(configFromCoordinatorStream)
val combinedConfigMap: util.Map[String, String] = new util.HashMap[String, String]()
combinedConfigMap.putAll(configFromCoordinatorStream)
combinedConfigMap.putAll(userDefinedConfig)
val combinedConfig: Config = new MapConfig(combinedConfigMap)
val taskConfig = new TaskConfig(combinedConfig)
// Instantiate the checkpoint manager with coordinator stream configuration.
val checkpointManager: CheckpointManager =
JavaOptionals.toRichOptional(taskConfig.getCheckpointManager(new MetricsRegistryMap))
.toOption
.getOrElse(throw new SamzaException("Configuration: task.checkpoint.factory is not defined."))
try {
// Find all the TaskNames that would be generated for this job config
val changelogManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
val jobModelManager = JobModelManager(combinedConfig, changelogManager.readPartitionMapping(),
coordinatorStreamStore, new MetricsRegistryMap())
val taskNames = jobModelManager
.jobModel
.getContainers
.values
.asScala
.flatMap(_.getTasks.asScala.keys)
.toSet
taskNames.foreach(checkpointManager.register)
checkpointManager.start()
// Get preferred read version for the checkpoint application
val checkpointReadVersion = taskConfig.getCheckpointReadVersions.get(0)
val defaultCheckpoint = if (checkpointReadVersion == 1) {
new CheckpointV1(new java.util.HashMap[SystemStreamPartition, String]())
} else if (checkpointReadVersion == 2) {
new CheckpointV2(CheckpointId.create(), new java.util.HashMap[SystemStreamPartition, String](),
new java.util.HashMap[String, util.Map[String, String]]())
} else {
throw new SamzaException("Unrecognized checkpoint read version: " + checkpointReadVersion)
}
val lastCheckpoints = taskNames.map(taskName => {
taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
.getOrElse(defaultCheckpoint)
}).toMap
lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2.getOffsets.asScala.toMap,
"Current checkpoint for task: " + lcp._1))
if (newOffsets != null) {
newOffsets.foreach {
case (taskName: TaskName, offsets: Map[SystemStreamPartition, String]) =>
logCheckpoint(taskName, offsets, "New offset to be written for task: " + taskName)
val checkpoint = if (checkpointReadVersion == 1) {
new CheckpointV1(offsets.asJava)
} else if (checkpointReadVersion == 2) {
val lastSCMs = lastCheckpoints.getOrElse(taskName, defaultCheckpoint)
.asInstanceOf[CheckpointV2].getStateCheckpointMarkers
new CheckpointV2(CheckpointId.create(), offsets.asJava, lastSCMs)
} else {
throw new SamzaException("Unrecognized checkpoint read version: " + checkpointReadVersion)
}
checkpointManager.writeCheckpoint(taskName, checkpoint)
info(s"Updated the checkpoint of the task: $taskName to: $offsets")
}
}
} finally {
checkpointManager.stop()
coordinatorStreamStore.close()
}
}