def writeCheckpoint()

in samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala [360:420]


  def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
    if (checkpoint != null && (checkpointManager != null || checkpointListeners.nonEmpty)) {
      debug("Writing checkpoint for taskName: %s as: %s." format (taskName, checkpoint))

      val sspToOffsets = checkpoint.getOffsets

      if(checkpointManager != null) {
        checkpointManager.writeCheckpoint(taskName, checkpoint)

        if(sspToOffsets != null) {
          sspToOffsets.asScala.foreach {
            case (ssp, cp) => {
              val metric = offsetManagerMetrics.checkpointedOffsets.get(ssp)
              // metric will be null for changelog SSPs since they're not registered with / tracked by the
              // OffsetManager. Ignore such SSPs.
              if (metric != null) metric.set(cp)
            }
          }
        }
      }

      // Invoke checkpoint listeners only for SSPs that are registered with the OffsetManager. For example,
      // changelog SSPs are not registered but may be present in the Checkpoint if transactional state checkpointing
      // is enabled.
      val registeredSSPs = systemStreamPartitions.getOrElse(taskName, Set[SystemStreamPartition]())
      sspToOffsets.asScala
        .filterKeys(registeredSSPs.contains)
        .groupBy { case (ssp, _) => ssp.getSystem }.foreach {
          case (systemName:String, offsets: Map[SystemStreamPartition, String]) => {
            // Option is empty if there is no checkpointListener for this systemName
            checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava))
          }
        }
    }

    // delete corresponding startpoints after checkpoint is supposed to be committed
    if (startpointManager != null && startpoints.contains(taskName) && taskSSPsWithProcessedOffsetUpdated.containsKey(taskName)) {
      val sspsWithProcessedOffsetUpdated = taskSSPsWithProcessedOffsetUpdated.get(taskName).keySet()
      startpointManager.removeFanOutForTaskSSPs(taskName, sspsWithProcessedOffsetUpdated)
      // Remove the startpoints for the ssps that have received the updates of processed offsets. if all ssps of the
      // task have received the updates of processed offsets, remove the whole task's startpoints.
      startpoints.get(taskName) match {
        case Some(sspToStartpoint) => {
          val newSspToStartpoint = sspToStartpoint.filterKeys(ssp => !sspsWithProcessedOffsetUpdated.contains(ssp)).toMap
          if (newSspToStartpoint.isEmpty) {
            startpoints -= taskName
            info("All startpoints for the taskName: %s have been committed to the checkpoint." format(taskName))
          } else {
            startpoints += taskName -> newSspToStartpoint
            debug("Updated the startpoints and the latest startpoints for the task %s: %s" format(taskName, newSspToStartpoint))
          }
        }
        case None => {}
      }

      if (startpoints.isEmpty) {
        info("All outstanding startpoints have been committed to the checkpoint.")
        startpointManager.stop
      }
    }
  }