in elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala [110:228]
def sendScrollScanRequest(): Unit =
try {
waitingForElasticData = true
scrollId match {
case None => {
log.debug("Doing initial search")
// Add extra params to search
val extraParams = Seq(
if (!searchParams.contains("size")) {
Some("size" -> settings.bufferSize.toString)
} else {
None
},
// Tell elastic to return the documents '_version'-property with the search-results
// http://nocf-www.elastic.co/guide/en/elasticsearch/reference/current/search-request-version.html
// https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html
if (!searchParams.contains("version") && settings.includeDocumentVersion) {
Some("version" -> "true")
} else {
None
})
val baseMap = Map("scroll" -> settings.scroll)
// only force sorting by _doc (meaning order is not known) if not specified in search params
val sortQueryParam = if (searchParams.contains("sort")) {
None
} else {
Some(("sort", "_doc"))
}
val routingQueryParam = searchParams.get("routing").map(r => ("routing", r))
val queryParams = baseMap ++ routingQueryParam ++ sortQueryParam
val completeParams = searchParams ++ extraParams.flatten - "routing"
val searchBody = "{" + completeParams
.map {
case (name, json) =>
"\"" + name + "\":" + json
}
.mkString(",") + "}"
val endpoint: String = settings.apiVersion match {
case ApiVersion.V5 => s"/${elasticsearchParams.indexName}/${elasticsearchParams.typeName.get}/_search"
case ApiVersion.V7 => s"/${elasticsearchParams.indexName}/_search"
case OpensearchApiVersion.V1 => s"/${elasticsearchParams.indexName}/_search"
case other => throw new IllegalArgumentException(s"API version $other is not supported")
}
val uri = prepareUri(Path(endpoint))
.withQuery(Uri.Query(queryParams))
val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
HttpEntity(ContentTypes.`application/json`, searchBody))
.withHeaders(settings.connection.headers)
ElasticsearchApi
.executeRequest(
request,
settings.connection)
.flatMap {
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json => responseHandler.invoke(json))
case response: HttpResponse =>
Unmarshal(response.entity).to[String].map { body =>
failureHandler
.invoke(
new RuntimeException(s"Request failed for POST $uri, got ${response.status} with body: $body"))
}
}
.recover {
case cause: Throwable => failureHandler.invoke(cause)
}
}
case Some(actualScrollId) => {
log.debug("Fetching next scroll")
val uri = prepareUri(Path("/_search/scroll"))
val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
HttpEntity(ContentTypes.`application/json`,
Map("scroll" -> settings.scroll, "scroll_id" -> actualScrollId).toJson.compactPrint))
.withHeaders(settings.connection.headers)
ElasticsearchApi
.executeRequest(
request,
settings.connection)
.flatMap {
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json => responseHandler.invoke(json))
case response: HttpResponse =>
Unmarshal(response.entity)
.to[String]
.map { body =>
failureHandler.invoke(
new RuntimeException(s"Request failed for POST $uri, got ${response.status} with body: $body"))
}
}
.recover {
case cause: Throwable => failureHandler.invoke(cause)
}
}
}
} catch {
case ex: Exception => failureHandler.invoke(ex)
}