app/agent/collector.scala (210 lines of code) (raw):

package agent import org.apache.pekko.actor.ActorSystem import net.logstash.logback.marker.Markers import org.joda.time.DateTime import org.apache.pekko.agent.Agent import utils._ import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.language.postfixOps import scala.util.{Failure, Random, Success} class CollectorAgent[T <: IndexedItem]( val collectorSet: CollectorSet[T], sourceStatusAgent: SourceStatusAgent, lazyStartup: Boolean = true )(actorSystem: ActorSystem) extends CollectorAgentTrait[T] with Logging with Marker with LifecycleWithoutApp { implicit private val collectorAgent: ExecutionContext = actorSystem.dispatchers.lookup("collectorAgent") val collectors: Seq[Collector[T]] = collectorSet.collectors val resourceName: Option[String] = collectors.headOption.map(_.resource.name) private var datumAgents: Map[Collector[T], ScheduledAgent[Datum[T]]] = Map.empty log.info( s"new collector set - ${collectorSet.resource.name} - with collectors ${collectors}" ) def get(collector: Collector[T]): Datum[T] = datumAgents(collector)() def get(): Iterable[Datum[T]] = datumAgents.values.map(_()) def getTuples: Iterable[(Label, T)] = get().flatMap(datum => datum.data.map(datum.label ->)) def getLabels: Seq[Label] = get().map(_.label).toSeq def size: Int = get().map(_.data.size).sum override def toMarkerMap: Map[String, Any] = Map("records" -> size, "durationType" -> "crawl") def update(collector: Collector[T], previous: Datum[T]): Datum[T] = { val s = new StopWatch val datum = Datum[T](collector) val timeSpent = s.elapsed sourceStatusAgent.update(datum.label) datum.label match { case l @ Label(product, origin, size, _, None) => val marker = Markers.appendEntries( (origin.toMarkerMap ++ l.toMarkerMap ++ this.toMarkerMap ++ Map( "duration" -> timeSpent )).asJava ) log.info( s"Crawl of ${product.name} from $origin successful (${timeSpent}ms): $size records, ${l.bestBefore}" )(marker) datum case l @ Label(product, origin, _, _, Some(error)) => val marker = Markers.appendEntries( (l.toMarkerMap ++ this.toMarkerMap ++ Map( "duration" -> timeSpent )).asJava ) previous.label match { case bad if bad.isError => log.error( s"Crawl of ${product.name} from $origin failed (${timeSpent}ms): NO data available as this has not been crawled successfully since Prism started", error )(marker) case stale if stale.bestBefore.isStale => log.error( s"Crawl of ${product.name} from $origin failed (${timeSpent}ms): leaving previously crawled STALE data (${stale.bestBefore.age.getStandardSeconds} seconds old)", error )(marker) case notYetStale if !notYetStale.bestBefore.isStale => log.warn( s"Crawl of ${product.name} from $origin failed (${timeSpent}ms): leaving previously crawled data (${notYetStale.bestBefore.age.getStandardSeconds} seconds old)", error )(marker) // this shouldn't happen, but we need to be exhaustive case _ => log.warn( s"Crawl of ${product.name} from $origin failed (${timeSpent}ms)", error )(marker) } previous } } def init(): Unit = { log.info(s"Starting agent for collectors: $collectors") datumAgents = collectors.flatMap { collector => val initial = if (lazyStartup) { val startupData = Datum.empty[T](collector) sourceStatusAgent.update(startupData.label) startupData } else { val startupData = update(collector, Datum.empty[T](collector)) assert( !startupData.label.isError, s"Error occurred collecting data when lazy startup is disabled: ${startupData.label}" ) startupData } val randomDelay = new Random().nextInt(60) val agent: Option[ScheduledAgent[Datum[T]]] = Some(collector.crawlRate.refreshPeriod).collect { case fd: FiniteDuration => ScheduledAgent[Datum[T]](randomDelay seconds, fd, initial) { previous => update(collector, previous) } } if (agent.isEmpty) { log.warn( s"The crawl rate period for $collector is ${collector.crawlRate.refreshPeriod}. This is not a finite duration so we are not initialising an agent." ) } agent.map(collector -> _) }.toMap log.info(s"Started agent for collectors: $collectors") } def shutdown(): Unit = { datumAgents.values.foreach(_.shutdown()) datumAgents = Map.empty } } trait CollectorAgentTrait[T <: IndexedItem] { def get(collector: Collector[T]): Datum[T] def get(): Iterable[Datum[T]] def getTuples: Iterable[(Label, T)] def getLabels: Seq[Label] def size: Int def update(collector: Collector[T], previous: Datum[T]): Datum[T] def init(): Unit def shutdown(): Unit } case class SourceStatus(state: Label, error: Option[Label] = None) { lazy val latest: Label = error.getOrElse(state) } class SourceStatusAgent( actorSystem: ActorSystem, prismRunTimeStopWatch: StopWatch ) extends Logging with Marker { implicit private val collectorAgent: ExecutionContext = actorSystem.dispatchers.lookup("collectorAgent") val sourceStatusAgent: Agent[Map[(ResourceType, Origin), SourceStatus]] = Agent(Map.empty) val initialisedResources: mutable.Map[ResourceType, Boolean] = mutable.Map() def update(label: Label): Unit = { sourceStatusAgent.alter { previousMap => val key = (label.resourceType, label.origin) val previous = previousMap.get(key) val next = label match { case good if !good.isError => SourceStatus(good) case bad => SourceStatus(previous.map(_.state).getOrElse(bad), Some(bad)) } previousMap + (key -> next) } onComplete { case Success(newMap) => if (!initialisedResources.getOrElse(label.resourceType, false)) { val timeSpent = prismRunTimeStopWatch.elapsed val uninitialisedSources = newMap.values.count(_.state.status != "success") val marker = Markers.appendEntries( Map( "totalSourcesToCrawl" -> newMap.size, "resource" -> label.resourceType.name, "sourcesYetToCrawl" -> uninitialisedSources, "duration" -> timeSpent, "durationType" -> "healthcheck", "percentageCrawled" -> math.floor( (newMap.size - uninitialisedSources) / newMap.size.toFloat ) ).asJava ) if (uninitialisedSources == 0) { initialisedResources += (label.resourceType -> true) log.info( s"Healthcheck passed successfully for ${label.resourceType.name} after ${timeSpent}ms" )(marker) } else { log.info( s"$uninitialisedSources out of ${newMap.size} still not healthy after ${timeSpent}ms" )(marker) } } case Failure(_) => log.warn(s"failed to update resource ${label.resourceType.name}") } } val bootTime = new DateTime() def sources: Datum[SourceStatus] = { val statusList = sourceStatusAgent().values val label = Label( ResourceType("sources"), new Origin { val vendor = "prism" val account = "prism" val resources = Set("sources") val crawlRate = Map(("sources" -> CrawlRate(Duration.Inf, Duration.Undefined))) val jsonFields = Map.empty[String, String] override def toMarkerMap: Map[String, Any] = jsonFields }, statusList.size, bootTime ) Datum(label, statusList.toSeq) } override def toMarkerMap: Map[String, Any] = Map.empty }