in processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java [537:737]
public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(
final GroupByQuery query,
@Nullable final ObjectMapper mapper
)
{
for (DimensionSpec dimension : query.getDimensions()) {
if (dimension.getOutputType().is(ValueType.COMPLEX) && !dimension.getOutputType().equals(ColumnType.NESTED_DATA)) {
if (mapper == null) {
throw DruidException.defensive(
"Cannot deserialize complex dimension of type[%s] from result cache if object mapper is not provided",
dimension.getOutputType().getComplexTypeName()
);
}
}
}
final Class<?>[] dimensionClasses = createDimensionClasses(query);
return new CacheStrategy<>()
{
private static final byte CACHE_STRATEGY_VERSION = 0x1;
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<DimensionSpec> dims = query.getDimensions();
@Override
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners, boolean bySegment)
{
//disable segment-level cache on borker,
//see PR https://github.com/apache/druid/issues/3820
return willMergeRunners || !bySegment;
}
@Override
public byte[] computeCacheKey(GroupByQuery query)
{
CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY)
.appendByte(CACHE_STRATEGY_VERSION)
.appendCacheable(query.getGranularity())
.appendCacheable(query.getDimFilter())
.appendCacheables(query.getAggregatorSpecs())
.appendCacheables(query.getDimensions())
.appendCacheable(query.getVirtualColumns());
if (query.isApplyLimitPushDown()) {
builder.appendCacheable(query.getLimitSpec());
}
return builder.build();
}
@Override
public byte[] computeResultLevelCacheKey(GroupByQuery query)
{
final CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY)
.appendByte(CACHE_STRATEGY_VERSION)
.appendCacheable(query.getGranularity())
.appendCacheable(query.getDimFilter())
.appendCacheables(query.getAggregatorSpecs())
.appendCacheables(query.getDimensions())
.appendCacheable(query.getVirtualColumns())
.appendCacheable(query.getHavingSpec())
.appendCacheable(query.getLimitSpec())
.appendCacheables(query.getPostAggregatorSpecs());
if (query.getSubtotalsSpec() != null && !query.getSubtotalsSpec().isEmpty()) {
for (List<String> subTotalSpec : query.getSubtotalsSpec()) {
builder.appendStrings(subTotalSpec);
}
}
return builder.build();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<ResultRow, Object> prepareForCache(boolean isResultLevelCache)
{
final boolean resultRowHasTimestamp = query.getResultRowHasTimestamp();
return new Function<>()
{
@Override
public Object apply(ResultRow resultRow)
{
final List<Object> retVal = new ArrayList<>(1 + dims.size() + aggs.size());
int inPos = 0;
if (resultRowHasTimestamp) {
retVal.add(resultRow.getLong(inPos++));
} else {
retVal.add(query.getUniversalTimestamp().getMillis());
}
for (int i = 0; i < dims.size(); i++) {
retVal.add(resultRow.get(inPos++));
}
for (int i = 0; i < aggs.size(); i++) {
retVal.add(resultRow.get(inPos++));
}
if (isResultLevelCache) {
for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++) {
retVal.add(resultRow.get(inPos++));
}
}
return retVal;
}
};
}
@Override
public Function<Object, ResultRow> pullFromCache(boolean isResultLevelCache)
{
final boolean resultRowHasTimestamp = query.getResultRowHasTimestamp();
final int dimensionStart = query.getResultRowDimensionStart();
final int aggregatorStart = query.getResultRowAggregatorStart();
final int postAggregatorStart = query.getResultRowPostAggregatorStart();
return new Function<>()
{
private final Granularity granularity = query.getGranularity();
@Override
public ResultRow apply(Object input)
{
Iterator<Object> results = ((List<Object>) input).iterator();
DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());
final int size = isResultLevelCache
? query.getResultRowSizeWithPostAggregators()
: query.getResultRowSizeWithoutPostAggregators();
final ResultRow resultRow = ResultRow.create(size);
if (resultRowHasTimestamp) {
resultRow.set(0, timestamp.getMillis());
}
final Iterator<DimensionSpec> dimsIter = dims.iterator();
int dimPos = 0;
while (dimsIter.hasNext() && results.hasNext()) {
final DimensionSpec dimensionSpec = dimsIter.next();
final Object dimensionObject = results.next();
final Object dimensionObjectCasted;
final ColumnType outputType = dimensionSpec.getOutputType();
// Must convert generic Jackson-deserialized type into the proper type. The downstream functions expect the
// dimensions to be of appropriate types for further processing like merging and comparing.
if (outputType.is(ValueType.COMPLEX)) {
// Json columns can interpret generic data objects appropriately, hence they are wrapped as is in StructuredData.
// They don't need to converted them from Object.class to StructuredData.class using object mapper as that is an
// expensive operation that will be wasteful.
if (outputType.equals(ColumnType.NESTED_DATA)) {
dimensionObjectCasted = StructuredData.wrap(dimensionObject);
} else {
dimensionObjectCasted = mapper.convertValue(dimensionObject, dimensionClasses[dimPos]);
}
} else {
dimensionObjectCasted = DimensionHandlerUtils.convertObjectToType(
dimensionObject,
dimensionSpec.getOutputType()
);
}
resultRow.set(dimensionStart + dimPos, dimensionObjectCasted);
dimPos++;
}
CacheStrategy.fetchAggregatorsFromCache(
aggs,
results,
isResultLevelCache,
(aggName, aggPosition, aggValueObject) -> {
resultRow.set(aggregatorStart + aggPosition, aggValueObject);
}
);
if (isResultLevelCache) {
for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) {
if (!results.hasNext()) {
throw DruidException.defensive("Ran out of objects while reading postaggs from cache!");
}
resultRow.set(postAggregatorStart + postPos, results.next());
}
}
if (dimsIter.hasNext() || results.hasNext()) {
throw new ISE(
"Found left over objects while reading from cache!! dimsIter[%s] results[%s]",
dimsIter.hasNext(),
results.hasNext()
);
}
return resultRow;
}
};
}
};
}