in app/agent/collector.scala [39:101]
def get(collector: Collector[T]): Datum[T] = datumAgents(collector)()
def get(): Iterable[Datum[T]] = datumAgents.values.map(_())
def getTuples: Iterable[(Label, T)] =
get().flatMap(datum => datum.data.map(datum.label ->))
def getLabels: Seq[Label] = get().map(_.label).toSeq
def size: Int = get().map(_.data.size).sum
override def toMarkerMap: Map[String, Any] =
Map("records" -> size, "durationType" -> "crawl")
def update(collector: Collector[T], previous: Datum[T]): Datum[T] = {
val s = new StopWatch
val datum = Datum[T](collector)
val timeSpent = s.elapsed
sourceStatusAgent.update(datum.label)
datum.label match {
case l @ Label(product, origin, size, _, None) =>
val marker = Markers.appendEntries(
(origin.toMarkerMap ++ l.toMarkerMap ++ this.toMarkerMap ++ Map(
"duration" -> timeSpent
)).asJava
)
log.info(
s"Crawl of ${product.name} from $origin successful (${timeSpent}ms): $size records, ${l.bestBefore}"
)(marker)
datum
case l @ Label(product, origin, _, _, Some(error)) =>
val marker = Markers.appendEntries(
(l.toMarkerMap ++ this.toMarkerMap ++ Map(
"duration" -> timeSpent
)).asJava
)
previous.label match {
case bad if bad.isError =>
log.error(
s"Crawl of ${product.name} from $origin failed (${timeSpent}ms): NO data available as this has not been crawled successfully since Prism started",
error
)(marker)
case stale if stale.bestBefore.isStale =>
log.error(
s"Crawl of ${product.name} from $origin failed (${timeSpent}ms): leaving previously crawled STALE data (${stale.bestBefore.age.getStandardSeconds} seconds old)",
error
)(marker)
case notYetStale if !notYetStale.bestBefore.isStale =>
log.warn(
s"Crawl of ${product.name} from $origin failed (${timeSpent}ms): leaving previously crawled data (${notYetStale.bestBefore.age.getStandardSeconds} seconds old)",
error
)(marker)
// this shouldn't happen, but we need to be exhaustive
case _ =>
log.warn(
s"Crawl of ${product.name} from $origin failed (${timeSpent}ms)",
error
)(marker)
}
previous
}
}