public CacheStrategy getCacheStrategy()

in processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java [537:737]


  public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(
      final GroupByQuery query,
      @Nullable final ObjectMapper mapper
  )
  {

    for (DimensionSpec dimension : query.getDimensions()) {
      if (dimension.getOutputType().is(ValueType.COMPLEX) && !dimension.getOutputType().equals(ColumnType.NESTED_DATA)) {
        if (mapper == null) {
          throw DruidException.defensive(
              "Cannot deserialize complex dimension of type[%s] from result cache if object mapper is not provided",
              dimension.getOutputType().getComplexTypeName()
          );
        }
      }
    }
    final Class<?>[] dimensionClasses = createDimensionClasses(query);

    return new CacheStrategy<>()
    {
      private static final byte CACHE_STRATEGY_VERSION = 0x1;
      private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
      private final List<DimensionSpec> dims = query.getDimensions();

      @Override
      public boolean isCacheable(GroupByQuery query, boolean willMergeRunners, boolean bySegment)
      {
        //disable segment-level cache on borker,
        //see PR https://github.com/apache/druid/issues/3820
        return willMergeRunners || !bySegment;
      }

      @Override
      public byte[] computeCacheKey(GroupByQuery query)
      {
        CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY)
            .appendByte(CACHE_STRATEGY_VERSION)
            .appendCacheable(query.getGranularity())
            .appendCacheable(query.getDimFilter())
            .appendCacheables(query.getAggregatorSpecs())
            .appendCacheables(query.getDimensions())
            .appendCacheable(query.getVirtualColumns());
        if (query.isApplyLimitPushDown()) {
          builder.appendCacheable(query.getLimitSpec());
        }
        return builder.build();
      }

      @Override
      public byte[] computeResultLevelCacheKey(GroupByQuery query)
      {
        final CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY)
            .appendByte(CACHE_STRATEGY_VERSION)
            .appendCacheable(query.getGranularity())
            .appendCacheable(query.getDimFilter())
            .appendCacheables(query.getAggregatorSpecs())
            .appendCacheables(query.getDimensions())
            .appendCacheable(query.getVirtualColumns())
            .appendCacheable(query.getHavingSpec())
            .appendCacheable(query.getLimitSpec())
            .appendCacheables(query.getPostAggregatorSpecs());

        if (query.getSubtotalsSpec() != null && !query.getSubtotalsSpec().isEmpty()) {
          for (List<String> subTotalSpec : query.getSubtotalsSpec()) {
            builder.appendStrings(subTotalSpec);
          }
        }
        return builder.build();
      }

      @Override
      public TypeReference<Object> getCacheObjectClazz()
      {
        return OBJECT_TYPE_REFERENCE;
      }

      @Override
      public Function<ResultRow, Object> prepareForCache(boolean isResultLevelCache)
      {
        final boolean resultRowHasTimestamp = query.getResultRowHasTimestamp();

        return new Function<>()
        {
          @Override
          public Object apply(ResultRow resultRow)
          {
            final List<Object> retVal = new ArrayList<>(1 + dims.size() + aggs.size());
            int inPos = 0;

            if (resultRowHasTimestamp) {
              retVal.add(resultRow.getLong(inPos++));
            } else {
              retVal.add(query.getUniversalTimestamp().getMillis());
            }

            for (int i = 0; i < dims.size(); i++) {
              retVal.add(resultRow.get(inPos++));
            }
            for (int i = 0; i < aggs.size(); i++) {
              retVal.add(resultRow.get(inPos++));
            }
            if (isResultLevelCache) {
              for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++) {
                retVal.add(resultRow.get(inPos++));
              }
            }
            return retVal;
          }
        };
      }

      @Override
      public Function<Object, ResultRow> pullFromCache(boolean isResultLevelCache)
      {
        final boolean resultRowHasTimestamp = query.getResultRowHasTimestamp();
        final int dimensionStart = query.getResultRowDimensionStart();
        final int aggregatorStart = query.getResultRowAggregatorStart();
        final int postAggregatorStart = query.getResultRowPostAggregatorStart();

        return new Function<>()
        {
          private final Granularity granularity = query.getGranularity();

          @Override
          public ResultRow apply(Object input)
          {
            Iterator<Object> results = ((List<Object>) input).iterator();

            DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());

            final int size = isResultLevelCache
                             ? query.getResultRowSizeWithPostAggregators()
                             : query.getResultRowSizeWithoutPostAggregators();

            final ResultRow resultRow = ResultRow.create(size);

            if (resultRowHasTimestamp) {
              resultRow.set(0, timestamp.getMillis());
            }

            final Iterator<DimensionSpec> dimsIter = dims.iterator();
            int dimPos = 0;
            while (dimsIter.hasNext() && results.hasNext()) {
              final DimensionSpec dimensionSpec = dimsIter.next();
              final Object dimensionObject = results.next();
              final Object dimensionObjectCasted;

              final ColumnType outputType = dimensionSpec.getOutputType();

              // Must convert generic Jackson-deserialized type into the proper type. The downstream functions expect the
              // dimensions to be of appropriate types for further processing like merging and comparing.
              if (outputType.is(ValueType.COMPLEX)) {
                // Json columns can interpret generic data objects appropriately, hence they are wrapped as is in StructuredData.
                // They don't need to converted them from Object.class to StructuredData.class using object mapper as that is an
                // expensive operation that will be wasteful.
                if (outputType.equals(ColumnType.NESTED_DATA)) {
                  dimensionObjectCasted = StructuredData.wrap(dimensionObject);
                } else {
                  dimensionObjectCasted = mapper.convertValue(dimensionObject, dimensionClasses[dimPos]);
                }
              } else {
                dimensionObjectCasted = DimensionHandlerUtils.convertObjectToType(
                    dimensionObject,
                    dimensionSpec.getOutputType()
                );
              }
              resultRow.set(dimensionStart + dimPos, dimensionObjectCasted);
              dimPos++;
            }

            CacheStrategy.fetchAggregatorsFromCache(
                aggs,
                results,
                isResultLevelCache,
                (aggName, aggPosition, aggValueObject) -> {
                  resultRow.set(aggregatorStart + aggPosition, aggValueObject);
                }
            );

            if (isResultLevelCache) {
              for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) {
                if (!results.hasNext()) {
                  throw DruidException.defensive("Ran out of objects while reading postaggs from cache!");
                }
                resultRow.set(postAggregatorStart + postPos, results.next());
              }
            }
            if (dimsIter.hasNext() || results.hasNext()) {
              throw new ISE(
                  "Found left over objects while reading from cache!! dimsIter[%s] results[%s]",
                  dimsIter.hasNext(),
                  results.hasNext()
              );
            }

            return resultRow;
          }
        };
      }
    };
  }