override protected[core] def query()

in common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala [311:380]


  override protected[core] def query(table: String,
                                     startKey: List[Any],
                                     endKey: List[Any],
                                     skip: Int,
                                     limit: Int,
                                     includeDocs: Boolean,
                                     descending: Boolean,
                                     reduce: Boolean,
                                     stale: StaleParameter)(implicit transid: TransactionId): Future[List[JsObject]] = {
    require(!(reduce && includeDocs), "reduce and includeDocs cannot both be true")
    require(!reduce, "Reduce scenario not supported") //TODO Investigate reduce
    require(skip >= 0, "skip should be non negative")
    require(limit >= 0, "limit should be non negative")
    documentHandler.checkIfTableSupported(table)

    val Array(ddoc, viewName) = table.split("/")

    val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] '$collName' searching '$table'")
    val realIncludeDocs = includeDocs | documentHandler.shouldAlwaysIncludeDocs(ddoc, viewName)
    val realLimit = if (limit > 0) skip + limit else limit

    val querySpec = viewMapper.prepareQuery(ddoc, viewName, startKey, endKey, realLimit, realIncludeDocs, descending)

    val options = newFeedOptions()
    val queryMetrics = scala.collection.mutable.Buffer[QueryMetrics]()
    if (transid.meta.extraLogging) {
      options.setPopulateQueryMetrics(true)
      options.setEmitVerboseTracesInQuery(true)
    }

    def collectQueryMetrics(r: FeedResponse[Document]): Unit = {
      collectMetrics(queryToken, r.getRequestCharge)
      queryMetrics.appendAll(r.getQueryMetrics.values().asScala)
    }

    val publisher =
      RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, querySpec, options))
    val f = Source
      .fromPublisher(publisher)
      .wireTap(collectQueryMetrics(_))
      .mapConcat(asVector)
      .drop(skip)
      .map(queryResultToWhiskJsonDoc)
      .map(js =>
        documentHandler
          .transformViewResult(ddoc, viewName, startKey, endKey, realIncludeDocs, js, CosmosDBArtifactStore.this))
      .mapAsync(1)(identity)
      .mapConcat(identity)
      .runWith(Sink.seq)
      .map(_.toList)
      .map(l => if (limit > 0) l.take(limit) else l)

    val g = f.andThen {
      case Success(queryResult) =>
        if (queryMetrics.nonEmpty) {
          val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics.toSeq: _*)
          logging.debug(
            this,
            s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
        }
        val stats = viewMapper.recordQueryStats(ddoc, viewName, descending, querySpec.getParameters, queryResult)
        val statsToLog = stats.map(s => " " + s).getOrElse("")
        transid.finished(
          this,
          start,
          s"[QUERY] '$collName' completed: matched ${queryResult.size}$statsToLog",
          InfoLevel)
    }
    reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
  }