in samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala [360:420]
def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
if (checkpoint != null && (checkpointManager != null || checkpointListeners.nonEmpty)) {
debug("Writing checkpoint for taskName: %s as: %s." format (taskName, checkpoint))
val sspToOffsets = checkpoint.getOffsets
if(checkpointManager != null) {
checkpointManager.writeCheckpoint(taskName, checkpoint)
if(sspToOffsets != null) {
sspToOffsets.asScala.foreach {
case (ssp, cp) => {
val metric = offsetManagerMetrics.checkpointedOffsets.get(ssp)
// metric will be null for changelog SSPs since they're not registered with / tracked by the
// OffsetManager. Ignore such SSPs.
if (metric != null) metric.set(cp)
}
}
}
}
// Invoke checkpoint listeners only for SSPs that are registered with the OffsetManager. For example,
// changelog SSPs are not registered but may be present in the Checkpoint if transactional state checkpointing
// is enabled.
val registeredSSPs = systemStreamPartitions.getOrElse(taskName, Set[SystemStreamPartition]())
sspToOffsets.asScala
.filterKeys(registeredSSPs.contains)
.groupBy { case (ssp, _) => ssp.getSystem }.foreach {
case (systemName:String, offsets: Map[SystemStreamPartition, String]) => {
// Option is empty if there is no checkpointListener for this systemName
checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava))
}
}
}
// delete corresponding startpoints after checkpoint is supposed to be committed
if (startpointManager != null && startpoints.contains(taskName) && taskSSPsWithProcessedOffsetUpdated.containsKey(taskName)) {
val sspsWithProcessedOffsetUpdated = taskSSPsWithProcessedOffsetUpdated.get(taskName).keySet()
startpointManager.removeFanOutForTaskSSPs(taskName, sspsWithProcessedOffsetUpdated)
// Remove the startpoints for the ssps that have received the updates of processed offsets. if all ssps of the
// task have received the updates of processed offsets, remove the whole task's startpoints.
startpoints.get(taskName) match {
case Some(sspToStartpoint) => {
val newSspToStartpoint = sspToStartpoint.filterKeys(ssp => !sspsWithProcessedOffsetUpdated.contains(ssp)).toMap
if (newSspToStartpoint.isEmpty) {
startpoints -= taskName
info("All startpoints for the taskName: %s have been committed to the checkpoint." format(taskName))
} else {
startpoints += taskName -> newSspToStartpoint
debug("Updated the startpoints and the latest startpoints for the task %s: %s" format(taskName, newSspToStartpoint))
}
}
case None => {}
}
if (startpoints.isEmpty) {
info("All outstanding startpoints have been committed to the checkpoint.")
startpointManager.stop
}
}
}