in couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala [50:164]
def insert(document: JsonDocument): Future[JsonDocument] = insertDoc(document)
def insertDoc[T <: Document[_]](document: T): Future[T] =
singleObservableToFuture(asyncBucket.insert(document), document)
def insert(document: JsonDocument, writeSettings: CouchbaseWriteSettings): Future[JsonDocument] =
insertDoc(document, writeSettings)
def insertDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T] =
singleObservableToFuture(asyncBucket.insert(document,
writeSettings.persistTo,
writeSettings.replicateTo,
writeSettings.timeout.toMillis,
TimeUnit.MILLISECONDS),
document)
def get(id: String): Future[Option[JsonDocument]] =
zeroOrOneObservableToFuture(asyncBucket.get(id))
def get[T <: Document[_]](id: String, documentClass: Class[T]): Future[Option[T]] =
zeroOrOneObservableToFuture(asyncBucket.get(id, documentClass))
def get(id: String, timeout: FiniteDuration): Future[Option[JsonDocument]] =
zeroOrOneObservableToFuture(asyncBucket.get(id, timeout.toMillis, TimeUnit.MILLISECONDS))
def get[T <: Document[_]](id: String,
timeout: FiniteDuration,
documentClass: Class[T]): scala.concurrent.Future[Option[T]] =
zeroOrOneObservableToFuture(asyncBucket.get(id, documentClass, timeout.toMillis, TimeUnit.MILLISECONDS))
def upsert(document: JsonDocument): Future[JsonDocument] = upsertDoc(document)
def upsertDoc[T <: Document[_]](document: T): Future[T] =
singleObservableToFuture(asyncBucket.upsert(document), document.id)
def upsert(document: JsonDocument, writeSettings: CouchbaseWriteSettings): Future[JsonDocument] =
upsertDoc(document, writeSettings)
def upsertDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T] =
singleObservableToFuture(asyncBucket.upsert(document,
writeSettings.persistTo,
writeSettings.replicateTo,
writeSettings.timeout.toMillis,
TimeUnit.MILLISECONDS),
document.id)
def replace(document: JsonDocument): Future[JsonDocument] = replaceDoc(document)
def replaceDoc[T <: Document[_]](document: T): Future[T] =
singleObservableToFuture(asyncBucket.replace(document), document.id)
def replace(document: JsonDocument, writeSettings: CouchbaseWriteSettings): Future[JsonDocument] =
replaceDoc(document, writeSettings)
def replaceDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T] =
singleObservableToFuture(asyncBucket.replace(document,
writeSettings.persistTo,
writeSettings.replicateTo,
writeSettings.timeout.toMillis,
TimeUnit.MILLISECONDS),
document.id)
def remove(id: String): Future[Done] =
singleObservableToFuture(asyncBucket.remove(id), id)
.map(_ => Done)(ExecutionContexts.parasitic)
def remove(id: String, writeSettings: CouchbaseWriteSettings): Future[Done] =
singleObservableToFuture(asyncBucket.remove(id,
writeSettings.persistTo,
writeSettings.replicateTo,
writeSettings.timeout.toMillis,
TimeUnit.MILLISECONDS),
id)
.map(_ => Done)(ExecutionContexts.parasitic)
def streamedQuery(query: N1qlQuery): Source[JsonObject, NotUsed] =
// FIXME verify cancellation works
Source.fromPublisher(RxReactiveStreams.toPublisher(asyncBucket.query(query).flatMap(RxUtilities.unfoldJsonObjects)))
def streamedQuery(query: Statement): Source[JsonObject, NotUsed] =
Source.fromPublisher(RxReactiveStreams.toPublisher(asyncBucket.query(query).flatMap(RxUtilities.unfoldJsonObjects)))
def singleResponseQuery(query: Statement): Future[Option[JsonObject]] =
singleResponseQuery(N1qlQuery.simple(query))
def singleResponseQuery(query: N1qlQuery): Future[Option[JsonObject]] =
zeroOrOneObservableToFuture(asyncBucket.query(query).flatMap(RxUtilities.unfoldJsonObjects))
def counter(id: String, delta: Long, initial: Long): Future[Long] =
singleObservableToFuture(asyncBucket.counter(id, delta, initial), id)
.map(_.content(): Long)(ExecutionContexts.parasitic)
def counter(id: String, delta: Long, initial: Long, writeSettings: CouchbaseWriteSettings): Future[Long] =
singleObservableToFuture(asyncBucket.counter(id,
delta,
initial,
writeSettings.persistTo,
writeSettings.replicateTo,
writeSettings.timeout.toMillis,
TimeUnit.MILLISECONDS),
id)
.map(_.content(): Long)(ExecutionContexts.parasitic)
def close(): Future[Done] =
if (!asyncBucket.isClosed) {
singleObservableToFuture(asyncBucket.close(), "close")
.flatMap { _ =>
cluster match {
case Some(cluster) =>
singleObservableToFuture(cluster.disconnect(), "close").map(_ => Done)(ExecutionContexts.global())
case None => Future.successful(Done)
}
}(ExecutionContexts.global())
} else {
Future.successful(Done)
}