in pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java [441:662]
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
int numExpressions = _inputExpressions.size();
boolean[] singleValues = new boolean[numExpressions];
DataType[] valueTypes = new DataType[numExpressions];
Object[] valueArrays = new Object[numExpressions];
extractValues(blockValSetMap, singleValues, valueTypes, valueArrays);
int numFilters = _filterEvaluators.size();
// Main expression is always index 0
if (valueTypes[0] != DataType.BYTES) {
if (singleValues[0]) {
switch (valueTypes[0]) {
case INT:
int[] intValues = (int[]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
int value = intValues[i];
if (_includeDefaultSketch) {
updateSketches.get(0).update(value);
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
updateSketches.get(j + 1).update(value);
}
}
}
break;
case LONG:
long[] longValues = (long[]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
long value = longValues[i];
if (_includeDefaultSketch) {
updateSketches.get(0).update(value);
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
updateSketches.get(j + 1).update(value);
}
}
}
break;
case FLOAT:
float[] floatValues = (float[]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
float value = floatValues[i];
if (_includeDefaultSketch) {
updateSketches.get(0).update(value);
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
updateSketches.get(j + 1).update(value);
}
}
}
break;
case DOUBLE:
double[] doubleValues = (double[]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
double value = doubleValues[i];
if (_includeDefaultSketch) {
updateSketches.get(0).update(value);
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
updateSketches.get(j + 1).update(value);
}
}
}
break;
case STRING:
String[] stringValues = (String[]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
String value = stringValues[i];
if (_includeDefaultSketch) {
updateSketches.get(0).update(value);
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
updateSketches.get(j + 1).update(value);
}
}
}
break;
default:
throw new IllegalStateException(
"Illegal single-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: "
+ valueTypes[0]);
}
} else {
switch (valueTypes[0]) {
case INT:
int[][] intValues = (int[][]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
int[] values = intValues[i];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int value : values) {
defaultSketch.update(value);
}
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
UpdateSketch updateSketch = updateSketches.get(j + 1);
for (int value : values) {
updateSketch.update(value);
}
}
}
}
break;
case LONG:
long[][] longValues = (long[][]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
long[] values = longValues[i];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (long value : values) {
defaultSketch.update(value);
}
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
UpdateSketch updateSketch = updateSketches.get(j + 1);
for (long value : values) {
updateSketch.update(value);
}
}
}
}
break;
case FLOAT:
float[][] floatValues = (float[][]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
float[] values = floatValues[i];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (float value : values) {
defaultSketch.update(value);
}
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
UpdateSketch updateSketch = updateSketches.get(j + 1);
for (float value : values) {
updateSketch.update(value);
}
}
}
}
break;
case DOUBLE:
double[][] doubleValues = (double[][]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
double[] values = doubleValues[i];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (double value : values) {
defaultSketch.update(value);
}
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
UpdateSketch updateSketch = updateSketches.get(j + 1);
for (double value : values) {
updateSketch.update(value);
}
}
}
}
break;
case STRING:
String[][] stringValues = (String[][]) valueArrays[0];
for (int i = 0; i < length; i++) {
List<UpdateSketch> updateSketches = getUpdateSketches(groupByResultHolder, groupKeyArray[i]);
String[] values = stringValues[i];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (String value : values) {
defaultSketch.update(value);
}
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
UpdateSketch updateSketch = updateSketches.get(j + 1);
for (String value : values) {
updateSketch.update(value);
}
}
}
}
break;
default:
throw new IllegalStateException(
"Illegal multi-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: " + valueTypes[0]);
}
}
} else {
// Serialized sketch
Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
for (int i = 0; i < length; i++) {
List<ThetaSketchAccumulator> thetaSketchAccumulators = getUnions(groupByResultHolder, groupKeyArray[i]);
Sketch sketch = sketches[i];
if (_includeDefaultSketch) {
thetaSketchAccumulators.get(0).apply(sketch);
}
for (int j = 0; j < numFilters; j++) {
if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
thetaSketchAccumulators.get(j + 1).apply(sketch);
}
}
}
}
}