public Status scan()

in scylla/src/main/java/site/ycsb/db/scylla/ScyllaCQLClient.java [373:447]


  public Status scan(String table, String startkey, int recordcount,
                     Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {

    try {
      PreparedStatement stmt = (fields == null) ? SCAN_ALL_STMT.get() : SCAN_STMTS.get(fields);

      // Prepare statement on demand
      if (stmt == null) {
        Select.Builder selectBuilder;

        if (fields == null) {
          selectBuilder = QueryBuilder.select().all();
        } else {
          selectBuilder = QueryBuilder.select();
          for (String col : fields) {
            ((Select.Selection) selectBuilder).column(col);
          }
        }

        Select selectStmt = selectBuilder.from(table);

        // The statement builder is not setup right for tokens.
        // So, we need to build it manually.
        String initialStmt = selectStmt.toString();
        String scanStmt = initialStmt.substring(0, initialStmt.length() - 1) +
            " WHERE " + QueryBuilder.token(YCSB_KEY) +
            " >= token(" + QueryBuilder.bindMarker() + ")" +
            " LIMIT " + QueryBuilder.bindMarker();
        stmt = session.prepare(scanStmt);
        stmt.setConsistencyLevel(readConsistencyLevel);
        if (trace) {
          stmt.enableTracing();
        }

        PreparedStatement prevStmt = (fields == null) ?
            SCAN_ALL_STMT.getAndSet(stmt) :
            SCAN_STMTS.putIfAbsent(new HashSet<>(fields), stmt);
        if (prevStmt != null) {
          stmt = prevStmt;
        }
      }

      LOGGER.debug(stmt.getQueryString());
      LOGGER.debug("startKey = {}, recordcount = {}", startkey, recordcount);

      ResultSet rs = session.execute(stmt.bind(startkey, recordcount));

      HashMap<String, ByteIterator> tuple;
      while (!rs.isExhausted()) {
        Row row = rs.one();
        tuple = new HashMap<>();

        ColumnDefinitions cd = row.getColumnDefinitions();

        for (ColumnDefinitions.Definition def : cd) {
          ByteBuffer val = row.getBytesUnsafe(def.getName());
          if (val != null) {
            tuple.put(def.getName(), new ByteArrayByteIterator(val.array()));
          } else {
            tuple.put(def.getName(), null);
          }
        }

        result.add(tuple);
      }

      return Status.OK;

    } catch (Exception e) {
      LOGGER.error(
          MessageFormatter.format("Error scanning with startkey: {}", startkey).getMessage(), e);
      return Status.ERROR;
    }

  }