app/modules/clustersync/ClusterSynchronisation.scala (95 lines of code) (raw):
package modules.clustersync
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import javax.inject._
import play.api.Logging
import repositories.{TagLookupCache, SectionLookupCache}
import services.{Config, KinesisConsumer}
import scala.jdk.CollectionConverters._
import com.google.common.util.concurrent.{ServiceManager, AbstractScheduledService}
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler
import play.api.inject.ApplicationLifecycle
import scala.concurrent.Future
import scala.util.control.NonFatal
import scala.jdk.CollectionConverters._
@Singleton
class ClusterSynchronisation @Inject() (lifecycle: ApplicationLifecycle) extends Logging {
val serviceManager = new ServiceManager(List(new NodeStatusHeartbeater(this)).asJava)
val reservation: AtomicReference[Option[NodeStatus]] = new AtomicReference[Option[NodeStatus]](None)
val tagCacheSynchroniser: AtomicReference[Option[KinesisConsumer]] = new AtomicReference[Option[KinesisConsumer]](None)
val sectionCacheSynchroniser: AtomicReference[Option[KinesisConsumer]] = new
AtomicReference[Option[KinesisConsumer]](None)
lifecycle.addStopHook{ () => Future.successful(stop()) }
serviceManager.startAsync()
initialise
def initialise: Unit = {
try {
logger.info("starting sync components...")
val ns = NodeStatusRepository.register()
reservation.set(Some(ns))
logger.info("loading tag cache")
TagLookupCache.refresh
val appName = s"tag-cache-syncroniser-${Config().aws.stage}-${ns.nodeId}"
logger.info(s"Starting tag sync kinesis consumer with appName: $appName")
val tagUpdateConsumer = new KinesisConsumer(Config().tagUpdateStreamName, appName, TagSyncUpdateProcessor)
logger.info("starting tag sync consumer")
tagUpdateConsumer.start()
tagCacheSynchroniser.set(Some(tagUpdateConsumer))
} catch {
case he: HeartbeatException => logger.error("failed to register in the cluster, will try again next heartbeat")
case NonFatal(e) => {
logger.error("failed to start sync", e)
pause
}
}
}
def pause: Unit = {
logger.warn("pausing cluster synchronisation")
tagCacheSynchroniser.get.foreach{consumer =>
logger.warn("stopping consumer")
consumer.stop()
tagCacheSynchroniser.set(None)
}
reservation.get.foreach{ns =>
logger.warn("deregistering node")
NodeStatusRepository.deregister(ns)
reservation.set(None)
}
}
def heartbeat: Unit = {
try {
reservation.get() match {
case Some(ns) => {
try {
logger.info(s"heartbeating as node ${ns.nodeId}...")
reservation.set(Some(NodeStatusRepository.heartbeat(ns)))
} catch {
case he: HeartbeatException => {
logger.error("heartbeat failed", he)
pause
}
}
}
case None => initialise
}
} catch {
case NonFatal(e) => logger.error("Error heartbeating", e)
}
}
def stop(): Unit = {
logger.info("shutting down synchronisation")
logger.info("stopping heartbeater")
serviceManager.stopAsync()
logger.info("awaiting service runner stop")
serviceManager.awaitStopped(10, TimeUnit.SECONDS)
logger.info("heartbeater stopped, stopping tag cache sync")
tagCacheSynchroniser.get foreach { consumer => consumer.stop() }
logger.info("deregistering node")
reservation.get foreach { ns => NodeStatusRepository.deregister(ns) }
logger.info("synchronisation shutdown complete")
}
}
class NodeStatusHeartbeater(clusterSyncronisation: ClusterSynchronisation) extends AbstractScheduledService {
override def runOneIteration(): Unit = clusterSyncronisation.heartbeat
override def scheduler(): Scheduler = Scheduler.newFixedDelaySchedule(1, 1, TimeUnit.MINUTES)
}