in processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java [66:243]
public Sequence<ScanResultValue> process(
final ScanQuery query,
final Segment segment,
final ResponseContext responseContext,
@Nullable final QueryMetrics<?> queryMetrics
)
{
final Long numScannedRows = responseContext.getRowScanCount();
if (numScannedRows != null && numScannedRows >= query.getScanRowsLimit() && query.getTimeOrder().equals(Order.NONE)) {
return Sequences.empty();
}
if (segment.isTombstone()) {
return Sequences.empty();
}
final boolean hasTimeout = query.context().hasTimeout();
final Long timeoutAt = responseContext.getTimeoutTime();
final CursorFactory cursorFactory = segment.as(CursorFactory.class);
if (cursorFactory == null) {
throw new ISE(
"Null cursor factory found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final List<String> allColumns = new ArrayList<>();
if (query.getColumns() != null && !query.getColumns().isEmpty()) {
// Unless we're in legacy mode, allColumns equals query.getColumns() exactly. This is nice since it makes
// the compactedList form easier to use.
allColumns.addAll(query.getColumns());
} else {
final Set<String> availableColumns = Sets.newLinkedHashSet(
Iterables.concat(
cursorFactory.getRowSignature().getColumnNames(),
Iterables.transform(
Arrays.asList(query.getVirtualColumns().getVirtualColumns()),
VirtualColumn::getOutputName
)
)
);
allColumns.addAll(availableColumns);
}
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
Preconditions.checkArgument(intervals.size() == 1, "Can only handle a single interval, got[%s]", intervals);
// If the row count is not set, set it to 0, else do nothing.
responseContext.addRowScanCount(0);
final long limit = calculateRemainingScanRowsLimit(query, responseContext);
final CursorHolder cursorHolder = cursorFactory.makeCursorHolder(makeCursorBuildSpec(query, queryMetrics));
if (Order.NONE != query.getTimeOrder()
&& Cursors.getTimeOrdering(cursorHolder.getOrdering()) != query.getTimeOrder()) {
final String failureReason = StringUtils.format(
"Cannot order by[%s] with direction[%s] on cursor with order[%s].",
ColumnHolder.TIME_COLUMN_NAME,
query.getTimeOrder(),
cursorHolder.getOrdering()
);
cursorHolder.close();
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.UNSUPPORTED)
.build("%s", failureReason);
}
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
{
@Override
public Iterator<ScanResultValue> make()
{
final Cursor cursor = cursorHolder.asCursor();
if (cursor == null) {
return Collections.emptyIterator();
}
final List<BaseObjectColumnValueSelector> columnSelectors = new ArrayList<>(allColumns.size());
final RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
for (String column : allColumns) {
final BaseObjectColumnValueSelector selector = factory.makeColumnValueSelector(column);
ColumnCapabilities columnCapabilities = factory.getColumnCapabilities(column);
rowSignatureBuilder.add(column, ColumnType.fromCapabilities(columnCapabilities));
columnSelectors.add(selector);
}
final int batchSize = query.getBatchSize();
return new Iterator<>()
{
private long offset = 0;
@Override
public boolean hasNext()
{
return !cursor.isDone() && offset < limit;
}
@Override
public ScanResultValue next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
final long lastOffset = offset;
final Object events;
final ScanQuery.ResultFormat resultFormat = query.getResultFormat();
if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) {
events = rowsToCompactedList();
} else if (ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
events = rowsToList();
} else {
throw new UOE("resultFormat[%s] is not supported", resultFormat.toString());
}
responseContext.addRowScanCount(offset - lastOffset);
return new ScanResultValue(
segment.getId() == null ? segment.asString() : segment.getId().toString(),
allColumns,
events,
rowSignatureBuilder.build()
);
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
private List<List<Object>> rowsToCompactedList()
{
final List<List<Object>> events = new ArrayList<>(batchSize);
final long iterLimit = Math.min(limit, offset + batchSize);
for (; !cursor.isDone() && offset < iterLimit; cursor.advance(), offset++) {
final List<Object> theEvent = new ArrayList<>(allColumns.size());
for (int j = 0; j < allColumns.size(); j++) {
theEvent.add(getColumnValue(j));
}
events.add(theEvent);
}
return events;
}
private List<Map<String, Object>> rowsToList()
{
List<Map<String, Object>> events = Lists.newArrayListWithCapacity(batchSize);
final long iterLimit = Math.min(limit, offset + batchSize);
for (; !cursor.isDone() && offset < iterLimit; cursor.advance(), offset++) {
final Map<String, Object> theEvent = new LinkedHashMap<>();
for (int j = 0; j < allColumns.size(); j++) {
theEvent.put(allColumns.get(j), getColumnValue(j));
}
events.add(theEvent);
}
return events;
}
private Object getColumnValue(int i)
{
final BaseObjectColumnValueSelector selector = columnSelectors.get(i);
final Object value = selector == null ? null : selector.getObject();
return value;
}
};
}
@Override
public void cleanup(Iterator<ScanResultValue> iterFromMake)
{
}
}
).withBaggage(cursorHolder);
}