app/services/CommissionStatusPropagator.scala (140 lines of code) (raw):

package services import java.util.UUID import akka.actor.{Actor, ActorRef, Props} import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.module.scala.JsonScalaEnumeration import com.google.inject.Inject import models.{EntryStatus, ProjectEntry, ProjectEntryRow, ProjectEntrySerializer, EntryStatusMapper} import org.slf4j.LoggerFactory import play.api.Logger import play.api.db.slick.DatabaseConfigProvider import play.api.libs.json.Writes import services.RabbitMqPropagator._ import slick.jdbc.PostgresProfile import slick.jdbc.PostgresProfile.api._ import javax.inject.Named import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success, Try} import scala.concurrent.duration._ import scala.util.control.NonFatal object CommissionStatusPropagator { private val logger = LoggerFactory.getLogger(getClass) def props = Props[CommissionStatusPropagator] trait CommissionStatusEvent { val uuid:UUID } class EnumStatusType extends TypeReference[EntryStatus.type] {} case class CommissionStatusUpdate(commissionId:Int, @JsonScalaEnumeration(classOf[EnumStatusType]) newValue:EntryStatus.Value, override val uuid:UUID) extends CommissionStatusEvent with JacksonSerializable case class RetryFromState(override val uuid: UUID) extends JacksonSerializable with CommissionStatusEvent object CommissionStatusUpdate { def apply(commissionId:Int, newStatus:EntryStatus.Value) = new CommissionStatusUpdate(commissionId, newStatus, UUID.randomUUID()) } case class EventHandled(uuid: UUID) extends CommissionStatusEvent } /** * this actor allows commission status changes to apply to their contained projects. * * the logic table is as follows: * Commission “Completed” → all projects NOT Completed or Killed should be set to Completed * Commission “Held” → all projects NOT Completed or Killed or Held should be set to Held * Commission “In Progress” → no change * Commission "Killed" -> all projects NOT Completed or Killed should be set to Killed * @param configuration * @param dbConfigProvider */ class CommissionStatusPropagator @Inject() (dbConfigProvider: DatabaseConfigProvider, @Named("rabbitmq-propagator") implicit val rabbitMqPropagator: ActorRef)(implicit system: akka.actor.ActorSystem) extends Actor with models.ProjectEntrySerializer { import CommissionStatusPropagator._ import scala.concurrent.duration._ private final var state: CommissionStatusPropagatorState = CommissionStatusPropagatorState(Map()) private final var restoreCompleted = false val logger = Logger(getClass) val dbConfig = dbConfigProvider.get[PostgresProfile] // Use a mutable map to track handled events private val handledEvents = scala.collection.mutable.Set[UUID]() private def withRetry[T](maxRetries: Int, delay: FiniteDuration)(block: => Future[T]): Future[T] = { block.recoverWith { case NonFatal(e) if maxRetries > 0 => logger.warn(s"Operation failed, retrying in $delay... ($maxRetries retries left)", e) akka.pattern.after(delay, using = system.scheduler)(withRetry(maxRetries - 1, delay)(block)) } } def confirmHandled(evtAsObject: CommissionStatusEvent): Unit = { handledEvents += evtAsObject.uuid logger.debug(s"Marked event ${evtAsObject.uuid} as handled") state = state.removed(evtAsObject) } override def receive: Receive = { case RetryFromState => if (state.size != 0) logger.warn(s"CommissionStatusPropagator retrying ${state.size} events from state") state.foreach { stateEntry => logger.warn(s"Retrying event ${stateEntry._1}") self ! stateEntry._2 } case evt@CommissionStatusUpdate(commissionId, newStatus, uuid) => val originalSender = sender() logger.info(s"$uuid: Received notification that commission $commissionId changed to $newStatus") val futureResult = withRetry(3, 1.second) { updateCommissionProjects(newStatus, commissionId) } futureResult.onComplete { case Failure(err) => logger.error(s"Could not fetch project entries for $commissionId to $newStatus: ", err) originalSender ! akka.actor.Status.Failure(err) case Success(updatedProjects) => val successfulProjects = updatedProjects.collect { case Success(project) => project } logger.info(s"Project status change to $newStatus for ${successfulProjects.length} projects.") originalSender ! akka.actor.Status.Success(successfulProjects.length) } confirmHandled(evt) } def updateCommissionProjects(newStatus: EntryStatus.Value, commissionId: Int, projectsToVerify: Set[Int] = Set.empty): Future[Seq[Try[Int]]] = { val action: DBIO[Seq[(Int, ProjectEntry)]] = ProjectEntry.getProjectsEligibleForStatusChange(newStatus, commissionId) dbConfig.db.run(action).flatMap { projectTuples => if (projectTuples.isEmpty) { logger.info(s"No projects found needing status update to $newStatus for commission $commissionId") Future.successful(Seq.empty) } else { logger.info(s"Found ${projectTuples.length} projects to update to $newStatus for commission $commissionId") logger.info(s"Project IDs to update: ${projectTuples.map(_._1).mkString(", ")}") // Process updates sequentially using foldLeft projectTuples.foldLeft(Future.successful(Seq.empty[Try[Int]])) { case (accFuture, (id, project)) => accFuture.flatMap { acc => val updatedProject = project.copy(status = newStatus) val updateAction = (for { _ <- DBIO.successful(logger.info(s"Starting transaction for project $id")) updateCount <- TableQuery[ProjectEntryRow].filter(_.id === id).update(updatedProject) _ = logger.info(s"Database update for project $id completed with count: $updateCount") verification <- TableQuery[ProjectEntryRow].filter(_.id === id).result.headOption _ = verification match { case Some(updated) if updated.status == newStatus => logger.info(s"Verified project $id is now status: ${updated.status}") case Some(updated) => logger.warn(s"Project $id status mismatch - expected: $newStatus, actual: ${updated.status}") throw new Exception(s"Project $id status mismatch - expected: $newStatus, actual: ${updated.status}") case None => logger.error(s"Could not verify project $id - not found after update") throw new Exception(s"Project $id not found after update") } } yield (updateCount, verification)).transactionally dbConfig.db.run(updateAction).map { case (count, Some(updated)) if updated.status == newStatus => val projectSerializer = new ProjectEntrySerializer {} implicit val projectsWrites: Writes[ProjectEntry] = projectSerializer.projectEntryWrites rabbitMqPropagator.tell( ChangeEvent(Seq(projectsWrites.writes(updatedProject)), Some("project"), UpdateOperation()), Actor.noSender ) logger.info(s"Successfully updated project $id and sent to RabbitMQ") acc :+ Success(id) case (_, Some(updated)) => logger.error(s"Project $id update verification failed - status mismatch") acc :+ Failure(new Exception(s"Project $id update verification failed")) case (_, None) => logger.error(s"Project $id update verification failed - project not found") acc :+ Failure(new Exception(s"Project $id not found after update")) }.recover { case err => logger.error(s"Failed to update project $id", err) acc :+ Failure(err) } } }.map { results => val (successes, failures) = results.partition(_.isSuccess) logger.info(s"Update complete: ${successes.length} successes, ${failures.length} failures") if (failures.nonEmpty) { logger.error(s"Failed project updates: ${failures.map(_.failed.get.getMessage).mkString(", ")}") } results } } } } }