private def client()

in s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala [38:111]


  private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
  /**
    * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
    * note that this should be return true on all success.
    * we assumes that each storage implementation has client as member variable.
    *
    * @param cluster  : where this key values should be stored.
    * @param kvs      : sequence of SKeyValue that need to be stored in storage.
    * @param withWait : flag to control wait ack from storage.
    *                 note that in AsynchbaseStorage(which support asynchronous operations), even with true,
    *                 it never block thread, but rather submit work and notified by event loop when storage send ack back.
    * @return ack message from storage.
    */
  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
    if (kvs.isEmpty) Future.successful(MutateResponse.Success)
    else {
      val _client = client(withWait)
      val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)

      /* Asynchbase IncrementRequest does not implement HasQualifiers */
      val incrementsFutures = increments.map { kv =>
        val countVal = Bytes.toLong(kv.value)
        val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, countVal)
        val fallbackFn: (Exception => MutateResponse) = { ex =>
          logger.error(s"mutation failed. $request", ex)
          new IncrementResponse(false, -1L, -1L)
        }
        val future = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long =>
          new IncrementResponse(true, resultCount.longValue(), countVal)
        }.toFuture(MutateResponse.IncrementFailure)

        if (withWait) future else Future.successful(MutateResponse.IncrementSuccess)
      }

      /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
      val othersFutures = putAndDeletes.groupBy { kv =>
        (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
      }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>

        val durability = groupedKeyValues.head.durability
        val qualifiers = new ArrayBuffer[Array[Byte]]()
        val values = new ArrayBuffer[Array[Byte]]()

        groupedKeyValues.foreach { kv =>
          if (kv.qualifier != null) qualifiers += kv.qualifier
          if (kv.value != null) values += kv.value
        }
        val defer = operation match {
          case SKeyValue.Put =>
            val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp)
            put.setDurable(durability)
            _client.put(put)
          case SKeyValue.Delete =>
            val delete =
              if (qualifiers.isEmpty)
                new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp)
              else
                new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp)
            delete.setDurable(durability)
            _client.delete(delete)
        }
        if (withWait) {
          defer.toFuture(new AnyRef()).map(_ => MutateResponse.Success).recover { case ex: Exception =>
            groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) }
            MutateResponse.Failure
          }
        } else Future.successful(MutateResponse.Success)
      }
      for {
        incrementRets <- Future.sequence(incrementsFutures)
        otherRets <- Future.sequence(othersFutures)
      } yield new MutateResponse(isSuccess = (incrementRets ++ otherRets).forall(_.isSuccess))
    }
  }