def sendScrollScanRequest()

in elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala [110:228]


  def sendScrollScanRequest(): Unit =
    try {
      waitingForElasticData = true

      scrollId match {
        case None => {
          log.debug("Doing initial search")

          // Add extra params to search
          val extraParams = Seq(
            if (!searchParams.contains("size")) {
              Some("size" -> settings.bufferSize.toString)
            } else {
              None
            },
            // Tell elastic to return the documents '_version'-property with the search-results
            // http://nocf-www.elastic.co/guide/en/elasticsearch/reference/current/search-request-version.html
            // https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html
            if (!searchParams.contains("version") && settings.includeDocumentVersion) {
              Some("version" -> "true")
            } else {
              None
            })

          val baseMap = Map("scroll" -> settings.scroll)

          // only force sorting by _doc (meaning order is not known) if not specified in search params
          val sortQueryParam = if (searchParams.contains("sort")) {
            None
          } else {
            Some(("sort", "_doc"))
          }

          val routingQueryParam = searchParams.get("routing").map(r => ("routing", r))

          val queryParams = baseMap ++ routingQueryParam ++ sortQueryParam
          val completeParams = searchParams ++ extraParams.flatten - "routing"

          val searchBody = "{" + completeParams
            .map {
              case (name, json) =>
                "\"" + name + "\":" + json
            }
            .mkString(",") + "}"

          val endpoint: String = settings.apiVersion match {
            case ApiVersion.V5           => s"/${elasticsearchParams.indexName}/${elasticsearchParams.typeName.get}/_search"
            case ApiVersion.V7           => s"/${elasticsearchParams.indexName}/_search"
            case OpensearchApiVersion.V1 => s"/${elasticsearchParams.indexName}/_search"
            case other                   => throw new IllegalArgumentException(s"API version $other is not supported")
          }

          val uri = prepareUri(Path(endpoint))
            .withQuery(Uri.Query(queryParams))

          val request = HttpRequest(HttpMethods.POST)
            .withUri(uri)
            .withEntity(
              HttpEntity(ContentTypes.`application/json`, searchBody))
            .withHeaders(settings.connection.headers)

          ElasticsearchApi
            .executeRequest(
              request,
              settings.connection)
            .flatMap {
              case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
                Unmarshal(responseEntity)
                  .to[String]
                  .map(json => responseHandler.invoke(json))
              case response: HttpResponse =>
                Unmarshal(response.entity).to[String].map { body =>
                  failureHandler
                    .invoke(
                      new RuntimeException(s"Request failed for POST $uri, got ${response.status} with body: $body"))
                }
            }
            .recover {
              case cause: Throwable => failureHandler.invoke(cause)
            }
        }

        case Some(actualScrollId) => {
          log.debug("Fetching next scroll")

          val uri = prepareUri(Path("/_search/scroll"))

          val request = HttpRequest(HttpMethods.POST)
            .withUri(uri)
            .withEntity(
              HttpEntity(ContentTypes.`application/json`,
                Map("scroll" -> settings.scroll, "scroll_id" -> actualScrollId).toJson.compactPrint))
            .withHeaders(settings.connection.headers)

          ElasticsearchApi
            .executeRequest(
              request,
              settings.connection)
            .flatMap {
              case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
                Unmarshal(responseEntity)
                  .to[String]
                  .map(json => responseHandler.invoke(json))
              case response: HttpResponse =>
                Unmarshal(response.entity)
                  .to[String]
                  .map { body =>
                    failureHandler.invoke(
                      new RuntimeException(s"Request failed for POST $uri, got ${response.status} with body: $body"))
                  }
            }
            .recover {
              case cause: Throwable => failureHandler.invoke(cause)
            }
        }
      }
    } catch {
      case ex: Exception => failureHandler.invoke(ex)
    }