public void aggregate()

in pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java [192:438]


  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
      Map<ExpressionContext, BlockValSet> blockValSetMap) {
    int numExpressions = _inputExpressions.size();
    boolean[] singleValues = new boolean[numExpressions];
    DataType[] valueTypes = new DataType[numExpressions];
    Object[] valueArrays = new Object[numExpressions];
    extractValues(blockValSetMap, singleValues, valueTypes, valueArrays);
    int numFilters = _filterEvaluators.size();

    // Main expression is always index 0
    if (valueTypes[0] != DataType.BYTES) {
      List<UpdateSketch> updateSketches = getUpdateSketches(aggregationResultHolder);
      if (singleValues[0]) {
        switch (valueTypes[0]) {
          case INT:
            int[] intValues = (int[]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                defaultSketch.update(intValues[i]);
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  updateSketch.update(intValues[j]);
                }
              }
            }
            break;
          case LONG:
            long[] longValues = (long[]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                defaultSketch.update(longValues[i]);
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  updateSketch.update(longValues[j]);
                }
              }
            }
            break;
          case FLOAT:
            float[] floatValues = (float[]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                defaultSketch.update(floatValues[i]);
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  updateSketch.update(floatValues[j]);
                }
              }
            }
            break;
          case DOUBLE:
            double[] doubleValues = (double[]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                defaultSketch.update(doubleValues[i]);
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  updateSketch.update(doubleValues[j]);
                }
              }
            }
            break;
          case STRING:
            String[] stringValues = (String[]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                defaultSketch.update(stringValues[i]);
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  updateSketch.update(stringValues[j]);
                }
              }
            }
            break;
          default:
            throw new IllegalStateException(
                "Illegal single-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: "
                    + valueTypes[0]);
        }
      } else {
        switch (valueTypes[0]) {
          case INT:
            int[][] intValues = (int[][]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                for (int value : intValues[i]) {
                  defaultSketch.update(value);
                }
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  for (int value : intValues[j]) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          case LONG:
            long[][] longValues = (long[][]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                for (long value : longValues[i]) {
                  defaultSketch.update(value);
                }
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  for (long value : longValues[j]) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          case FLOAT:
            float[][] floatValues = (float[][]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                for (float value : floatValues[i]) {
                  defaultSketch.update(value);
                }
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  for (float value : floatValues[j]) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          case DOUBLE:
            double[][] doubleValues = (double[][]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                for (double value : doubleValues[i]) {
                  defaultSketch.update(value);
                }
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  for (double value : doubleValues[j]) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          case STRING:
            String[][] stringValues = (String[][]) valueArrays[0];
            if (_includeDefaultSketch) {
              UpdateSketch defaultSketch = updateSketches.get(0);
              for (int i = 0; i < length; i++) {
                for (String value : stringValues[i]) {
                  defaultSketch.update(value);
                }
              }
            }
            for (int i = 0; i < numFilters; i++) {
              FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
              UpdateSketch updateSketch = updateSketches.get(i + 1);
              for (int j = 0; j < length; j++) {
                if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
                  for (String value : stringValues[j]) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          default:
            throw new IllegalStateException(
                "Illegal multi-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: " + valueTypes[0]);
        }
      }
    } else {
      // Serialized sketch
      List<ThetaSketchAccumulator> thetaSketchAccumulators = getUnions(aggregationResultHolder);
      Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
      if (_includeDefaultSketch) {
        ThetaSketchAccumulator defaultThetaAccumulator = thetaSketchAccumulators.get(0);
        for (Sketch sketch : sketches) {
          defaultThetaAccumulator.apply(sketch);
        }
      }
      for (int i = 0; i < numFilters; i++) {
        FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
        ThetaSketchAccumulator thetaSketchAccumulator = thetaSketchAccumulators.get(i + 1);
        for (int j = 0; j < length; j++) {
          if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
            thetaSketchAccumulator.apply(sketches[j]);
          }
        }
      }
    }
  }