in app/agent/collector.scala [103:139]
def init(): Unit = {
log.info(s"Starting agent for collectors: $collectors")
datumAgents = collectors.flatMap { collector =>
val initial = if (lazyStartup) {
val startupData = Datum.empty[T](collector)
sourceStatusAgent.update(startupData.label)
startupData
} else {
val startupData = update(collector, Datum.empty[T](collector))
assert(
!startupData.label.isError,
s"Error occurred collecting data when lazy startup is disabled: ${startupData.label}"
)
startupData
}
val randomDelay = new Random().nextInt(60)
val agent: Option[ScheduledAgent[Datum[T]]] =
Some(collector.crawlRate.refreshPeriod).collect {
case fd: FiniteDuration =>
ScheduledAgent[Datum[T]](randomDelay seconds, fd, initial) {
previous =>
update(collector, previous)
}
}
if (agent.isEmpty) {
log.warn(
s"The crawl rate period for $collector is ${collector.crawlRate.refreshPeriod}. This is not a finite duration so we are not initialising an agent."
)
}
agent.map(collector -> _)
}.toMap
log.info(s"Started agent for collectors: $collectors")
}