public Status scan()

in riak/src/main/java/site/ycsb/db/riak/RiakKVClient.java [280:358]


  public Status scan(String table, String startkey, int recordcount, Set<String> fields,
                     Vector<HashMap<String, ByteIterator>> result) {
    if (strongConsistency && !performStrongConsistentScans) {
      return Status.NOT_IMPLEMENTED;
    }

    // The strong consistent bucket-type is not capable of storing 2i indexes. So, we need to read them from the fake
    // one (which we use only to store indexes). This is why, when using such a consistency model, the bucketType2i
    // variable is set to FAKE_BUCKET_TYPE.
    IntIndexQuery iiq = new IntIndexQuery
        .Builder(new Namespace(bucketType2i, table), "key", getKeyAsLong(startkey), Long.MAX_VALUE)
        .withMaxResults(recordcount)
        .withPaginationSort(true)
        .build();

    Location location;
    RiakFuture<IntIndexQuery.Response, IntIndexQuery> future = riakClient.executeAsync(iiq);

    try {
      IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
      List<IntIndexQuery.Response.Entry> entries = response.getEntries();

      // If no entries were retrieved, then something bad happened...
      if (entries.size() == 0) {
        if (debug) {
          System.err.println("Unable to scan any record starting from key " + startkey + ", aborting transaction. " +
              "Reason: NOT FOUND");
        }

        return Status.NOT_FOUND;
      }

      for (IntIndexQuery.Response.Entry entry : entries) {
        // If strong consistency is in use, then the actual location of the object we want to read is obtained by
        // fetching the key from the one retrieved with the 2i indexes search operation.
        if (strongConsistency) {
          location = new Location(new Namespace(bucketType, table), entry.getRiakObjectLocation().getKeyAsString());
        } else {
          location = entry.getRiakObjectLocation();
        }

        FetchValue fv = new FetchValue.Builder(location)
            .withOption(FetchValue.Option.R, rvalue)
            .build();

        FetchValue.Response keyResponse = fetch(fv);

        if (keyResponse.isNotFound()) {
          if (debug) {
            System.err.println("Unable to scan all requested records starting from key " + startkey + ", aborting " +
                "transaction. Reason: NOT FOUND");
          }

          return Status.NOT_FOUND;
        }

        // Create the partial result to add to the result vector.
        HashMap<String, ByteIterator> partialResult = new HashMap<>();
        createResultHashMap(fields, keyResponse, partialResult);
        result.add(partialResult);
      }
    } catch (TimeoutException e) {
      if (debug) {
        System.err.println("Unable to scan all requested records starting from key " + startkey + ", aborting " +
            "transaction. Reason: TIME OUT");
      }

      return TIME_OUT;
    } catch (Exception e) {
      if (debug) {
        System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
            "Reason: " + e.toString());
      }

      return Status.ERROR;
    }

    return Status.OK;
  }