in pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java [587:808]
private void createDerivedColumnV1Indices(String column, FunctionEvaluator functionEvaluator,
List<ColumnMetadata> argumentsMetadata, boolean errorOnFailure)
throws Exception {
// Initialize value readers for all arguments
int numArguments = argumentsMetadata.size();
List<ValueReader> valueReaders = new ArrayList<>(numArguments);
for (ColumnMetadata argumentMetadata : argumentsMetadata) {
valueReaders.add(new ValueReader(argumentMetadata));
}
FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
NullValueVectorCreator nullValueVectorCreator = null;
if (isNullable(fieldSpec)) {
nullValueVectorCreator = new NullValueVectorCreator(_indexDir, fieldSpec.getName());
}
// Just log the first function evaluation error
int functionEvaluateErrorCount = 0;
Exception functionEvalError = null;
Object[] inputValuesWithError = null;
try {
// Calculate the values for the derived column
Object[] inputValues = new Object[numArguments];
int numDocs = _segmentMetadata.getTotalDocs();
Object[] outputValues = new Object[numDocs];
PinotDataType outputValueType = null;
for (int i = 0; i < numDocs; i++) {
for (int j = 0; j < numArguments; j++) {
inputValues[j] = valueReaders.get(j).getValue(i);
}
Object outputValue = null;
try {
outputValue = functionEvaluator.evaluate(inputValues);
} catch (Exception e) {
if (!errorOnFailure) {
LOGGER.debug("Encountered an exception while evaluating function {} for derived column {} with "
+ "arguments: {}", functionEvaluator, column, Arrays.toString(inputValues), e);
functionEvaluateErrorCount++;
if (functionEvalError == null) {
functionEvalError = e;
inputValuesWithError = Arrays.copyOf(inputValues, inputValues.length);
}
} else {
throw e;
}
}
if (outputValue == null) {
outputValue = fieldSpec.getDefaultNullValue();
if (nullValueVectorCreator != null) {
// Add doc to null vector index if the column / table has null handling enabled
nullValueVectorCreator.setNull(i);
}
} else if (outputValueType == null) {
Class<?> outputValueClass = outputValue.getClass();
outputValueType = FunctionUtils.getArgumentType(outputValueClass);
Preconditions.checkState(outputValueType != null, "Unsupported output value class: %s", outputValueClass);
}
outputValues[i] = outputValue;
}
if (functionEvaluateErrorCount > 0) {
LOGGER.warn("Caught {} exceptions while evaluating derived column: {} with function: {}. The first input value "
+ "tuple that led to an error is: {}", functionEvaluateErrorCount, column, functionEvaluator,
Arrays.toString(inputValuesWithError), functionEvalError);
}
if (nullValueVectorCreator != null) {
nullValueVectorCreator.seal();
}
FieldIndexConfigs fieldIndexConfigs = _indexLoadingConfig.getFieldIndexConfig(column);
DictionaryIndexConfig dictionaryIndexConfig =
fieldIndexConfigs != null ? fieldIndexConfigs.getConfig(StandardIndexes.dictionary())
: DictionaryIndexConfig.DEFAULT;
boolean createDictionary = dictionaryIndexConfig.isEnabled();
StatsCollectorConfig statsCollectorConfig =
new StatsCollectorConfig(_indexLoadingConfig.getTableConfig(), _schema, null);
ColumnIndexCreationInfo indexCreationInfo;
boolean isSingleValue = fieldSpec.isSingleValueField();
switch (fieldSpec.getDataType().getStoredType()) {
case INT: {
for (int i = 0; i < numDocs; i++) {
outputValues[i] = getIntOutputValue(outputValues[i], isSingleValue, outputValueType,
(Integer) fieldSpec.getDefaultNullValue(), createDictionary);
}
IntColumnPreIndexStatsCollector statsCollector =
new IntColumnPreIndexStatsCollector(column, statsCollectorConfig);
for (Object value : outputValues) {
statsCollector.collect(value);
}
statsCollector.seal();
indexCreationInfo =
new ColumnIndexCreationInfo(statsCollector, createDictionary, false, true,
fieldSpec.getDefaultNullValue());
break;
}
case LONG: {
for (int i = 0; i < numDocs; i++) {
outputValues[i] = getLongOutputValue(outputValues[i], isSingleValue, outputValueType,
(Long) fieldSpec.getDefaultNullValue(), createDictionary);
}
LongColumnPreIndexStatsCollector statsCollector =
new LongColumnPreIndexStatsCollector(column, statsCollectorConfig);
for (Object value : outputValues) {
statsCollector.collect(value);
}
statsCollector.seal();
indexCreationInfo =
new ColumnIndexCreationInfo(statsCollector, createDictionary, false, true,
fieldSpec.getDefaultNullValue());
break;
}
case FLOAT: {
for (int i = 0; i < numDocs; i++) {
outputValues[i] = getFloatOutputValue(outputValues[i], isSingleValue, outputValueType,
(Float) fieldSpec.getDefaultNullValue(), createDictionary);
}
FloatColumnPreIndexStatsCollector statsCollector =
new FloatColumnPreIndexStatsCollector(column, statsCollectorConfig);
for (Object value : outputValues) {
statsCollector.collect(value);
}
statsCollector.seal();
indexCreationInfo =
new ColumnIndexCreationInfo(statsCollector, createDictionary, false, true,
fieldSpec.getDefaultNullValue());
break;
}
case DOUBLE: {
for (int i = 0; i < numDocs; i++) {
outputValues[i] = getDoubleOutputValue(outputValues[i], isSingleValue, outputValueType,
(Double) fieldSpec.getDefaultNullValue(), createDictionary);
}
DoubleColumnPreIndexStatsCollector statsCollector =
new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig);
for (Object value : outputValues) {
statsCollector.collect(value);
}
statsCollector.seal();
indexCreationInfo =
new ColumnIndexCreationInfo(statsCollector, createDictionary, false, true,
fieldSpec.getDefaultNullValue());
break;
}
case BIG_DECIMAL: {
for (int i = 0; i < numDocs; i++) {
Preconditions.checkState(isSingleValue, "MV BIG_DECIMAL is not supported");
// Skip type conversion if output value is already the required type. If outputValueType is null, that
// means the transform function returned null for all docs and in that case outputValue will be the
// default null value for the field type
if (outputValueType != null && !(outputValues[i] instanceof BigDecimal)) {
outputValues[i] = outputValueType.toBigDecimal(outputValues[i]);
}
}
DoubleColumnPreIndexStatsCollector statsCollector =
new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig);
for (Object value : outputValues) {
statsCollector.collect(value);
}
statsCollector.seal();
indexCreationInfo =
new ColumnIndexCreationInfo(statsCollector, createDictionary, false, true,
fieldSpec.getDefaultNullValue());
break;
}
case STRING: {
for (int i = 0; i < numDocs; i++) {
outputValues[i] = getStringOutputValue(outputValues[i], isSingleValue, outputValueType,
(String) fieldSpec.getDefaultNullValue());
}
StringColumnPreIndexStatsCollector statsCollector =
new StringColumnPreIndexStatsCollector(column, statsCollectorConfig);
for (Object value : outputValues) {
statsCollector.collect(value);
}
statsCollector.seal();
indexCreationInfo = new ColumnIndexCreationInfo(statsCollector, createDictionary,
dictionaryIndexConfig.getUseVarLengthDictionary(), true, fieldSpec.getDefaultNullValue());
break;
}
case BYTES: {
for (int i = 0; i < numDocs; i++) {
outputValues[i] = getBytesOutputValue(outputValues[i], isSingleValue, outputValueType,
(byte[]) fieldSpec.getDefaultNullValue());
}
BytesColumnPredIndexStatsCollector statsCollector =
new BytesColumnPredIndexStatsCollector(column, statsCollectorConfig);
for (Object value : outputValues) {
statsCollector.collect(value);
}
statsCollector.seal();
boolean useVarLengthDictionary;
if (!statsCollector.isFixedLength()) {
useVarLengthDictionary = true;
} else {
useVarLengthDictionary = dictionaryIndexConfig.getUseVarLengthDictionary();
}
indexCreationInfo =
new ColumnIndexCreationInfo(statsCollector, createDictionary, useVarLengthDictionary, true,
new ByteArray((byte[]) fieldSpec.getDefaultNullValue()));
break;
}
default:
throw new IllegalStateException();
}
if (createDictionary) {
createDerivedColumnForwardIndexWithDictionary(column, fieldSpec, outputValues, indexCreationInfo);
} else {
createDerivedColumnForwardIndexWithoutDictionary(column, fieldSpec, outputValues, indexCreationInfo);
}
} finally {
for (ValueReader valueReader : valueReaders) {
valueReader.close();
}
}
}