backend/app/services/ElasticsearchSyntax.scala (119 lines of code) (raw):
package services
import com.sksamuel.elastic4s.{ElasticClient, ElasticRequest, Executor, Functor, Handler, HttpClient, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.fields.{ObjectField, TextField}
import com.sksamuel.elastic4s.http.{JavaClient, JavaClientExceptionWrapper}
import com.sksamuel.elastic4s.requests.bulk.BulkCompatibleRequest
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.requests.indexes.CreateIndexResponse
import com.sksamuel.elastic4s.requests.indexes.admin.IndexExistsResponse
import com.sksamuel.elastic4s.requests.mappings.dynamictemplate.DynamicMapping
import com.sksamuel.elastic4s.requests.mappings.{MappingDefinition}
import com.sksamuel.elastic4s.requests.update.{UpdateByQueryRequest, UpdateRequest}
import model.Language
import org.apache.http.{ContentTooLongException, HttpHost}
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.sniff.Sniffer
import utils.Logging
import utils.attempt.{Attempt, ContentTooLongFailure, ElasticSearchQueryFailure, MultipleFailures, UnknownFailure}
import scala.concurrent.ExecutionContext
trait ElasticsearchSyntax { this: Logging =>
def client: ElasticClient
// Can be overridden to avoid gnarly waiting in tests
def refreshPolicy: RefreshPolicy = RefreshPolicy.NONE
implicit def attemptFunctor(implicit ec: ExecutionContext): Functor[Attempt] = new Functor[Attempt] {
override def map[A, B](fa: Attempt[A])(f: A => B): Attempt[B] = fa.map(f)
}
def textKeywordField(name: String): TextField = {
textField(name: String).fields(keywordField("keyword"))
}
// Use multiLanguageField unless you have an index per language
def singleLanguageField(name: String, language: Language): TextField = {
textField(name)
.analyzer(language.analyzer)
.termVector("with_positions_offsets")
.fields(
textField("exact")
.analyzer("standard")
.termVector("with_positions_offsets")
)
}
// Each entry is added by a call to multiLanguageField
def emptyMultiLanguageField(name: String): ObjectField = objectField(name)
def multiLanguageField(name: String, language: Language): ObjectField = {
ObjectField(name, properties = Seq(
singleLanguageField(language.key, language)
))
}
def multiLanguageValue(languages: List[Language], value: Any): Map[String, Any] = languages.map { lang =>
lang.key -> value
}.toMap
implicit def attemptExecutor(implicit ec: ExecutionContext): Executor[Attempt] =
(client: HttpClient, request: ElasticRequest) => {
Attempt.fromFuture(Executor.FutureExecutor(ec).exec(client, request)) {
case err: JavaClientExceptionWrapper if err.getCause.isInstanceOf[ContentTooLongException] =>
ContentTooLongFailure(err.getMessage)
case err =>
UnknownFailure(err)
}
}
def createIndexIfNotAlreadyExists(indexName: String, mappingDefinition: MappingDefinition)(implicit ec: ExecutionContext): Attempt[CreateIndexResponse] = {
execute(indexExists(indexName)).flatMap {
case IndexExistsResponse(false) =>
execute(createIndex(indexName).mapping(
// We don't want to index unknown fields at all, throw an error on write instead
mappingDefinition.dynamic(DynamicMapping.Strict))
)
case _ =>
logger.info(s"Elasticsearch index $indexName already exists")
Attempt.Right(CreateIndexResponse(acknowledged = true, shards_acknowledged = true))
}
}
def execute[T, U](r: T)(implicit handler: Handler[T, U], manifest: Manifest[U], ec: ExecutionContext): Attempt[U] =
client.execute(r).flatMap {
case r: RequestSuccess[U] =>
Attempt.Right(r.result)
case f: RequestFailure =>
Attempt.Left(ElasticSearchQueryFailure(new IllegalStateException(f.error.toString), f.status, f.body))
}
def executeNoReturn[T, U](r: T)(implicit handler: Handler[T, U], manifest: Manifest[U], ec: ExecutionContext): Attempt[Unit] =
client.execute(r).flatMap {
case _: RequestSuccess[U] =>
Attempt.Right(())
case f: RequestFailure =>
Attempt.Left(ElasticSearchQueryFailure(new IllegalStateException(f.error.toString), f.status, f.body))
}
def executeUpdate[U](r: UpdateRequest)(implicit ec: ExecutionContext): Attempt[Unit] =
executeNoReturn(r.refresh(refreshPolicy))
def executeUpdateByQuery[U](r: UpdateByQueryRequest)(implicit ec: ExecutionContext): Attempt[Unit] =
executeNoReturn(r.refresh(refreshPolicy))
def executeUpdateByQueryImmediateRefresh[U](r: UpdateByQueryRequest)(implicit ec: ExecutionContext): Attempt[Unit] =
executeNoReturn(r.refresh(RefreshPolicy.IMMEDIATE))
def executeBulk(requests: Iterable[BulkCompatibleRequest])(implicit ec: ExecutionContext): Attempt[Unit] = {
execute(bulk(requests).refresh(refreshPolicy)).flatMap { response =>
if(response.hasFailures) {
Attempt.Left(MultipleFailures(response.failures.map { f =>
ElasticSearchQueryFailure(new IllegalStateException(f.error.toString), f.status, None)
}.toList))
} else {
Attempt.Right(())
}
}
}
}
object ElasticsearchSyntax {
object NestedField {
val key = "key"
val values = "values"
}
}
object ElasticsearchClient extends Logging {
def apply(config: Config)(implicit executionContext: ExecutionContext): Attempt[ElasticClient] =
apply(config.elasticsearch.hosts, config.elasticsearch.disableSniffing.getOrElse(false))
def apply(hostnames: List[String], disableSniffing: Boolean = false)(implicit executionContext: ExecutionContext): Attempt[ElasticClient] = Attempt.catchNonFatalBlasé {
val hosts = hostnames.map(HttpHost.create)
val client = RestClient.builder(hosts: _*).setRequestConfigCallback(reqConfigCallback =>
reqConfigCallback.setConnectionRequestTimeout(5000) // Default is 500. Needed for when we send lots of updates quickly.
).build()
// Sniffer allows the client to discover the other nodes in the cluster and load balancer/route around failure
if(!disableSniffing) {
Sniffer.builder(client).build()
}
ElasticClient(JavaClient.fromRestClient(client))
}
}