def insert()

in couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala [50:164]


  def insert(document: JsonDocument): Future[JsonDocument] = insertDoc(document)

  def insertDoc[T <: Document[_]](document: T): Future[T] =
    singleObservableToFuture(asyncBucket.insert(document), document)

  def insert(document: JsonDocument, writeSettings: CouchbaseWriteSettings): Future[JsonDocument] =
    insertDoc(document, writeSettings)

  def insertDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T] =
    singleObservableToFuture(asyncBucket.insert(document,
      writeSettings.persistTo,
      writeSettings.replicateTo,
      writeSettings.timeout.toMillis,
      TimeUnit.MILLISECONDS),
      document)

  def get(id: String): Future[Option[JsonDocument]] =
    zeroOrOneObservableToFuture(asyncBucket.get(id))

  def get[T <: Document[_]](id: String, documentClass: Class[T]): Future[Option[T]] =
    zeroOrOneObservableToFuture(asyncBucket.get(id, documentClass))

  def get(id: String, timeout: FiniteDuration): Future[Option[JsonDocument]] =
    zeroOrOneObservableToFuture(asyncBucket.get(id, timeout.toMillis, TimeUnit.MILLISECONDS))

  def get[T <: Document[_]](id: String,
      timeout: FiniteDuration,
      documentClass: Class[T]): scala.concurrent.Future[Option[T]] =
    zeroOrOneObservableToFuture(asyncBucket.get(id, documentClass, timeout.toMillis, TimeUnit.MILLISECONDS))

  def upsert(document: JsonDocument): Future[JsonDocument] = upsertDoc(document)

  def upsertDoc[T <: Document[_]](document: T): Future[T] =
    singleObservableToFuture(asyncBucket.upsert(document), document.id)

  def upsert(document: JsonDocument, writeSettings: CouchbaseWriteSettings): Future[JsonDocument] =
    upsertDoc(document, writeSettings)

  def upsertDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T] =
    singleObservableToFuture(asyncBucket.upsert(document,
      writeSettings.persistTo,
      writeSettings.replicateTo,
      writeSettings.timeout.toMillis,
      TimeUnit.MILLISECONDS),
      document.id)

  def replace(document: JsonDocument): Future[JsonDocument] = replaceDoc(document)

  def replaceDoc[T <: Document[_]](document: T): Future[T] =
    singleObservableToFuture(asyncBucket.replace(document), document.id)

  def replace(document: JsonDocument, writeSettings: CouchbaseWriteSettings): Future[JsonDocument] =
    replaceDoc(document, writeSettings)

  def replaceDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T] =
    singleObservableToFuture(asyncBucket.replace(document,
      writeSettings.persistTo,
      writeSettings.replicateTo,
      writeSettings.timeout.toMillis,
      TimeUnit.MILLISECONDS),
      document.id)

  def remove(id: String): Future[Done] =
    singleObservableToFuture(asyncBucket.remove(id), id)
      .map(_ => Done)(ExecutionContexts.parasitic)

  def remove(id: String, writeSettings: CouchbaseWriteSettings): Future[Done] =
    singleObservableToFuture(asyncBucket.remove(id,
      writeSettings.persistTo,
      writeSettings.replicateTo,
      writeSettings.timeout.toMillis,
      TimeUnit.MILLISECONDS),
      id)
      .map(_ => Done)(ExecutionContexts.parasitic)

  def streamedQuery(query: N1qlQuery): Source[JsonObject, NotUsed] =
    // FIXME verify cancellation works
    Source.fromPublisher(RxReactiveStreams.toPublisher(asyncBucket.query(query).flatMap(RxUtilities.unfoldJsonObjects)))

  def streamedQuery(query: Statement): Source[JsonObject, NotUsed] =
    Source.fromPublisher(RxReactiveStreams.toPublisher(asyncBucket.query(query).flatMap(RxUtilities.unfoldJsonObjects)))

  def singleResponseQuery(query: Statement): Future[Option[JsonObject]] =
    singleResponseQuery(N1qlQuery.simple(query))
  def singleResponseQuery(query: N1qlQuery): Future[Option[JsonObject]] =
    zeroOrOneObservableToFuture(asyncBucket.query(query).flatMap(RxUtilities.unfoldJsonObjects))

  def counter(id: String, delta: Long, initial: Long): Future[Long] =
    singleObservableToFuture(asyncBucket.counter(id, delta, initial), id)
      .map(_.content(): Long)(ExecutionContexts.parasitic)

  def counter(id: String, delta: Long, initial: Long, writeSettings: CouchbaseWriteSettings): Future[Long] =
    singleObservableToFuture(asyncBucket.counter(id,
      delta,
      initial,
      writeSettings.persistTo,
      writeSettings.replicateTo,
      writeSettings.timeout.toMillis,
      TimeUnit.MILLISECONDS),
      id)
      .map(_.content(): Long)(ExecutionContexts.parasitic)

  def close(): Future[Done] =
    if (!asyncBucket.isClosed) {
      singleObservableToFuture(asyncBucket.close(), "close")
        .flatMap { _ =>
          cluster match {
            case Some(cluster) =>
              singleObservableToFuture(cluster.disconnect(), "close").map(_ => Done)(ExecutionContexts.global())
            case None => Future.successful(Done)
          }
        }(ExecutionContexts.global())
    } else {
      Future.successful(Done)
    }