in common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala [311:380]
override protected[core] def query(table: String,
startKey: List[Any],
endKey: List[Any],
skip: Int,
limit: Int,
includeDocs: Boolean,
descending: Boolean,
reduce: Boolean,
stale: StaleParameter)(implicit transid: TransactionId): Future[List[JsObject]] = {
require(!(reduce && includeDocs), "reduce and includeDocs cannot both be true")
require(!reduce, "Reduce scenario not supported") //TODO Investigate reduce
require(skip >= 0, "skip should be non negative")
require(limit >= 0, "limit should be non negative")
documentHandler.checkIfTableSupported(table)
val Array(ddoc, viewName) = table.split("/")
val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] '$collName' searching '$table'")
val realIncludeDocs = includeDocs | documentHandler.shouldAlwaysIncludeDocs(ddoc, viewName)
val realLimit = if (limit > 0) skip + limit else limit
val querySpec = viewMapper.prepareQuery(ddoc, viewName, startKey, endKey, realLimit, realIncludeDocs, descending)
val options = newFeedOptions()
val queryMetrics = scala.collection.mutable.Buffer[QueryMetrics]()
if (transid.meta.extraLogging) {
options.setPopulateQueryMetrics(true)
options.setEmitVerboseTracesInQuery(true)
}
def collectQueryMetrics(r: FeedResponse[Document]): Unit = {
collectMetrics(queryToken, r.getRequestCharge)
queryMetrics.appendAll(r.getQueryMetrics.values().asScala)
}
val publisher =
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, querySpec, options))
val f = Source
.fromPublisher(publisher)
.wireTap(collectQueryMetrics(_))
.mapConcat(asVector)
.drop(skip)
.map(queryResultToWhiskJsonDoc)
.map(js =>
documentHandler
.transformViewResult(ddoc, viewName, startKey, endKey, realIncludeDocs, js, CosmosDBArtifactStore.this))
.mapAsync(1)(identity)
.mapConcat(identity)
.runWith(Sink.seq)
.map(_.toList)
.map(l => if (limit > 0) l.take(limit) else l)
val g = f.andThen {
case Success(queryResult) =>
if (queryMetrics.nonEmpty) {
val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics.toSeq: _*)
logging.debug(
this,
s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
}
val stats = viewMapper.recordQueryStats(ddoc, viewName, descending, querySpec.getParameters, queryResult)
val statsToLog = stats.map(s => " " + s).getOrElse("")
transid.finished(
this,
start,
s"[QUERY] '$collName' completed: matched ${queryResult.size}$statsToLog",
InfoLevel)
}
reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
}