backend/app/services/events/Events.scala (131 lines of code) (raw):
package services.events
import java.time.Instant
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.requests.searches.queries.RangeQuery
import model.frontend.Paging
import play.api.libs.json._
import services.ElasticsearchSyntax
import services.ElasticsearchSyntax.NestedField
import utils.Logging
import utils.attempt.Attempt
import scala.concurrent.ExecutionContext
sealed trait EventType
case object ActionStarted extends EventType
case object ActionComplete extends EventType
case object ActionFailed extends EventType
object EventType {
def fromString(eventType: String) = eventType match {
case "ActionStarted" => ActionStarted
case "ActionComplete" => ActionComplete
case "ActionFailed" => ActionFailed
case _ => throw new IllegalArgumentException(s"$eventType is not a valid EventType")
}
implicit val format: Format[EventType] = Format(
{
case JsString("ActionStarted") => JsSuccess(ActionStarted)
case JsString("ActionComplete") => JsSuccess(ActionComplete)
case JsString("ActionFailed") => JsSuccess(ActionFailed)
case unknown => JsError(s"Unknown EventType $unknown")
},
{
case ActionStarted => JsString("ActionStarted")
case ActionComplete => JsString("ActionComplete")
case ActionFailed => JsString("ActionFailed")
}
)
}
case class Event(eventType: EventType, timestamp: Long, description: String, tags: Map[String, String])
object Event {
implicit val eventResponseFormat = Json.format[Event]
}
case class FindEventsResponse(results: List[Event], hits: Long, pageSize: Long, page: Long) extends Paging[Event]
object FindEventsResponse {
implicit val findEventsResponseFormat = Json.format[FindEventsResponse]
}
sealed trait EventFilter
case class TagEquals(key: String, value: String) extends EventFilter
case class TagNotEquals(key: String, value: String) extends EventFilter
trait Events {
def setup(): Attempt[Events]
def record(eventType: EventType, description: String, tags: Map[String, String]): Unit
// start inclusive, end exclusive
def find(
start: Option[Long] = None,
end: Option[Long] = None,
filters: List[EventFilter] = List.empty,
page: Int = 0,
pageSize: Int = 1000
): Attempt[FindEventsResponse]
}
class ElasticsearchEvents(override val client: ElasticClient, eventIndexName: String)(implicit ec: ExecutionContext) extends Events with Logging with ElasticsearchSyntax {
import services.index.HitReaders.EventHitReader
override def setup(): Attempt[Events] = {
createIndexIfNotAlreadyExists(eventIndexName,
properties(
textKeywordField(EventFields.eventType),
dateField(EventFields.timestamp),
textField(EventFields.description),
nestedField(EventFields.tagsField).fields(
textKeywordField(EventFields.tags.key),
textKeywordField(EventFields.tags.values)
)
)
).map(_ => this)
}
override def record(eventType: EventType, description: String, tags: Map[String, String]): Unit = {
executeNoReturn {
indexInto(eventIndexName).fields(
EventFields.eventType -> eventType.toString,
EventFields.timestamp -> Instant.now().toEpochMilli,
EventFields.description -> description,
EventFields.tagsField -> tags.map {
case (k, v) => Map(EventFields.tags.key -> k, EventFields.tags.values -> v)
}
)
}
}
override def find(start: Option[Long], end: Option[Long], filters: List[EventFilter] = List.empty, page: Int, pageSize: Int): Attempt[FindEventsResponse] = {
val filterTerms = filters.collect { case TagEquals(k, v) => getFilterTerm(k, v) }
val mustNotTerms = filters.collect { case TagNotEquals(k, v) => getFilterTerm(k, v) }
execute {
search(eventIndexName)
.query(boolQuery().filter(getRangeQuery(start, end) ++ filterTerms).not(mustNotTerms))
.sortByFieldDesc(EventFields.timestamp)
.size(pageSize)
.from(page * pageSize)
}.map { resp =>
val events = resp.hits.hits.map(_.to[Event]).toList
val total = resp.totalHits
FindEventsResponse(events, total.toInt, pageSize, page)
}
}
private def getRangeQuery(start: Option[Long], end: Option[Long]): Option[RangeQuery] = (start, end) match {
case (Some(s), Some(e)) =>
Some(rangeQuery(EventFields.timestamp).gte(s).lt(e))
case (Some(s), None) =>
Some(rangeQuery(EventFields.timestamp).gte(s))
case (None, Some(e)) =>
Some(rangeQuery(EventFields.timestamp).lt(e))
case _ =>
None
}
private def getFilterTerm(k: String, v: String) = {
nestedQuery(EventFields.tagsField,
boolQuery().must(
termQuery(EventFields.tagsField + "." + NestedField.key + ".keyword", k),
termQuery(EventFields.tagsField + "." + NestedField.values + ".keyword", v)
)
)
}
}
object EventFields {
val eventsField = "events"
val eventType = "eventType"
val timestamp = "timestamp"
val description = "description"
val tagsField = "tags"
object tags {
val key = "key"
val values = "values"
}
}