public void aggregateGroupBySV()

in pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java [441:662]


  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
      Map<ExpressionContext, BlockValSet> blockValSetMap) {
    int numExpressions = _inputExpressions.size();
    boolean[] singleValues = new boolean[numExpressions];
    DataType[] valueTypes = new DataType[numExpressions];
    Object[] valueArrays = new Object[numExpressions];
    extractValues(blockValSetMap, singleValues, valueTypes, valueArrays);
    int numFilters = _filterEvaluators.size();

    // Main expression is always index 0
    if (valueTypes[0] != DataType.BYTES) {
      if (singleValues[0]) {
        switch (valueTypes[0]) {
          case INT:
            int[] intValues = (int[]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              int value = intValues[i];
              if (_includeDefaultSketch) {
                updateSketches.get(0).update(value);
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  updateSketches.get(j + 1).update(value);
                }
              }
            }
            break;
          case LONG:
            long[] longValues = (long[]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              long value = longValues[i];
              if (_includeDefaultSketch) {
                updateSketches.get(0).update(value);
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  updateSketches.get(j + 1).update(value);
                }
              }
            }
            break;
          case FLOAT:
            float[] floatValues = (float[]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              float value = floatValues[i];
              if (_includeDefaultSketch) {
                updateSketches.get(0).update(value);
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  updateSketches.get(j + 1).update(value);
                }
              }
            }
            break;
          case DOUBLE:
            double[] doubleValues = (double[]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              double value = doubleValues[i];
              if (_includeDefaultSketch) {
                updateSketches.get(0).update(value);
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  updateSketches.get(j + 1).update(value);
                }
              }
            }
            break;
          case STRING:
            String[] stringValues = (String[]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              String value = stringValues[i];
              if (_includeDefaultSketch) {
                updateSketches.get(0).update(value);
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  updateSketches.get(j + 1).update(value);
                }
              }
            }
            break;
          default:
            throw new IllegalStateException(
                "Illegal single-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: "
                    + valueTypes[0]);
        }
      } else {
        switch (valueTypes[0]) {
          case INT:
            int[][] intValues = (int[][]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              int[] values = intValues[i];
              if (_includeDefaultSketch) {
                UpdateSketch defaultSketch = updateSketches.get(0);
                for (int value : values) {
                  defaultSketch.update(value);
                }
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  UpdateSketch updateSketch = updateSketches.get(j + 1);
                  for (int value : values) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          case LONG:
            long[][] longValues = (long[][]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              long[] values = longValues[i];
              if (_includeDefaultSketch) {
                UpdateSketch defaultSketch = updateSketches.get(0);
                for (long value : values) {
                  defaultSketch.update(value);
                }
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  UpdateSketch updateSketch = updateSketches.get(j + 1);
                  for (long value : values) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          case FLOAT:
            float[][] floatValues = (float[][]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              float[] values = floatValues[i];
              if (_includeDefaultSketch) {
                UpdateSketch defaultSketch = updateSketches.get(0);
                for (float value : values) {
                  defaultSketch.update(value);
                }
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  UpdateSketch updateSketch = updateSketches.get(j + 1);
                  for (float value : values) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          case DOUBLE:
            double[][] doubleValues = (double[][]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              double[] values = doubleValues[i];
              if (_includeDefaultSketch) {
                UpdateSketch defaultSketch = updateSketches.get(0);
                for (double value : values) {
                  defaultSketch.update(value);
                }
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  UpdateSketch updateSketch = updateSketches.get(j + 1);
                  for (double value : values) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          case STRING:
            String[][] stringValues = (String[][]) valueArrays[0];
            for (int i = 0; i < length; i++) {
              List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
              String[] values = stringValues[i];
              if (_includeDefaultSketch) {
                UpdateSketch defaultSketch = updateSketches.get(0);
                for (String value : values) {
                  defaultSketch.update(value);
                }
              }
              for (int j = 0; j < numFilters; j++) {
                if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
                  UpdateSketch updateSketch = updateSketches.get(j + 1);
                  for (String value : values) {
                    updateSketch.update(value);
                  }
                }
              }
            }
            break;
          default:
            throw new IllegalStateException(
                "Illegal multi-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: " + valueTypes[0]);
        }
      }
    } else {
      // Serialized sketch
      Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
      for (int i = 0; i < length; i++) {
        List<ThetaSketchAccumulator> thetaSketchAccumulators = getUnions(groupByResultHolder, groupKeyArray[i]);
        Sketch sketch = sketches[i];
        if (_includeDefaultSketch) {
          thetaSketchAccumulators.get(0).apply(sketch);
        }
        for (int j = 0; j < numFilters; j++) {
          if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
            thetaSketchAccumulators.get(j + 1).apply(sketch);
          }
        }
      }
    }
  }