in scylla/src/main/java/site/ycsb/db/scylla/ScyllaCQLClient.java [373:447]
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
PreparedStatement stmt = (fields == null) ? SCAN_ALL_STMT.get() : SCAN_STMTS.get(fields);
// Prepare statement on demand
if (stmt == null) {
Select.Builder selectBuilder;
if (fields == null) {
selectBuilder = QueryBuilder.select().all();
} else {
selectBuilder = QueryBuilder.select();
for (String col : fields) {
((Select.Selection) selectBuilder).column(col);
}
}
Select selectStmt = selectBuilder.from(table);
// The statement builder is not setup right for tokens.
// So, we need to build it manually.
String initialStmt = selectStmt.toString();
String scanStmt = initialStmt.substring(0, initialStmt.length() - 1) +
" WHERE " + QueryBuilder.token(YCSB_KEY) +
" >= token(" + QueryBuilder.bindMarker() + ")" +
" LIMIT " + QueryBuilder.bindMarker();
stmt = session.prepare(scanStmt);
stmt.setConsistencyLevel(readConsistencyLevel);
if (trace) {
stmt.enableTracing();
}
PreparedStatement prevStmt = (fields == null) ?
SCAN_ALL_STMT.getAndSet(stmt) :
SCAN_STMTS.putIfAbsent(new HashSet<>(fields), stmt);
if (prevStmt != null) {
stmt = prevStmt;
}
}
LOGGER.debug(stmt.getQueryString());
LOGGER.debug("startKey = {}, recordcount = {}", startkey, recordcount);
ResultSet rs = session.execute(stmt.bind(startkey, recordcount));
HashMap<String, ByteIterator> tuple;
while (!rs.isExhausted()) {
Row row = rs.one();
tuple = new HashMap<>();
ColumnDefinitions cd = row.getColumnDefinitions();
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
tuple.put(def.getName(), new ByteArrayByteIterator(val.array()));
} else {
tuple.put(def.getName(), null);
}
}
result.add(tuple);
}
return Status.OK;
} catch (Exception e) {
LOGGER.error(
MessageFormatter.format("Error scanning with startkey: {}", startkey).getMessage(), e);
return Status.ERROR;
}
}