in pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java [665:941]
public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
int numExpressions = _inputExpressions.size();
boolean[] singleValues = new boolean[numExpressions];
DataType[] valueTypes = new DataType[numExpressions];
Object[] valueArrays = new Object[numExpressions];
extractValues(blockValSetMap, singleValues, valueTypes, valueArrays);
int numFilters = _filterEvaluators.size();
// Main expression is always index 0
if (valueTypes[0] != DataType.BYTES) {
if (singleValues[0]) {
switch (valueTypes[0]) {
case INT:
int[] intValues = (int[]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(0).update(intValues[i]);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(i + 1).update(intValues[j]);
}
}
}
}
break;
case LONG:
long[] longValues = (long[]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(0).update(longValues[i]);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(i + 1).update(longValues[j]);
}
}
}
}
break;
case FLOAT:
float[] floatValues = (float[]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(0).update(floatValues[i]);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(i + 1).update(floatValues[j]);
}
}
}
}
break;
case DOUBLE:
double[] doubleValues = (double[]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(0).update(doubleValues[i]);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(i + 1).update(doubleValues[j]);
}
}
}
}
break;
case STRING:
String[] stringValues = (String[]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(0).update(stringValues[i]);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
getUpdateSketches(groupByResultHolder, groupKey).get(i + 1).update(stringValues[j]);
}
}
}
}
break;
default:
throw new IllegalStateException(
"Illegal single-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: "
+ valueTypes[0]);
}
} else {
switch (valueTypes[0]) {
case INT:
int[][] intValues = (int[][]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch defaultSketch = getUpdateSketches(groupByResultHolder, groupKey).get(0);
for (int value : intValues[i]) {
defaultSketch.update(value);
}
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch updateSketch = getUpdateSketches(groupByResultHolder, groupKey).get(i + 1);
for (int value : intValues[i]) {
updateSketch.update(value);
}
}
}
}
}
break;
case LONG:
long[][] longValues = (long[][]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch defaultSketch = getUpdateSketches(groupByResultHolder, groupKey).get(0);
for (long value : longValues[i]) {
defaultSketch.update(value);
}
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch updateSketch = getUpdateSketches(groupByResultHolder, groupKey).get(i + 1);
for (long value : longValues[i]) {
updateSketch.update(value);
}
}
}
}
}
break;
case FLOAT:
float[][] floatValues = (float[][]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch defaultSketch = getUpdateSketches(groupByResultHolder, groupKey).get(0);
for (float value : floatValues[i]) {
defaultSketch.update(value);
}
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch updateSketch = getUpdateSketches(groupByResultHolder, groupKey).get(i + 1);
for (float value : floatValues[i]) {
updateSketch.update(value);
}
}
}
}
}
break;
case DOUBLE:
double[][] doubleValues = (double[][]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch defaultSketch = getUpdateSketches(groupByResultHolder, groupKey).get(0);
for (double value : doubleValues[i]) {
defaultSketch.update(value);
}
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch updateSketch = getUpdateSketches(groupByResultHolder, groupKey).get(i + 1);
for (double value : doubleValues[i]) {
updateSketch.update(value);
}
}
}
}
}
break;
case STRING:
String[][] stringValues = (String[][]) valueArrays[0];
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch defaultSketch = getUpdateSketches(groupByResultHolder, groupKey).get(0);
for (String value : stringValues[i]) {
defaultSketch.update(value);
}
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
UpdateSketch updateSketch = getUpdateSketches(groupByResultHolder, groupKey).get(i + 1);
for (String value : stringValues[i]) {
updateSketch.update(value);
}
}
}
}
}
break;
default:
throw new IllegalStateException(
"Illegal multi-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: " + valueTypes[0]);
}
}
} else {
// Serialized sketch
Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
getUnions(groupByResultHolder, groupKey).get(0).apply(sketches[i]);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int groupKey : groupKeysArray[i]) {
getUnions(groupByResultHolder, groupKey).get(i + 1).apply(sketches[i]);
}
}
}
}
}
}