in processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java [126:341]
public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws IOException
{
try {
long dimStartTime = System.currentTimeMillis();
int numMergeIndex = 0;
SortedValueDictionary sortedLookup = null;
final Indexed[] sortedLookups = new Indexed[adapters.size()];
final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
final Indexed<Object[]>[] sortedArrayLookups = new Indexed[adapters.size()];
final SortedMap<String, FieldTypeInfo.MutableTypeSet> mergedFields = new TreeMap<>();
boolean forceNested = false;
Object constantValue = null;
boolean hasArrays = false;
boolean isConstant = true;
for (int i = 0; i < adapters.size(); i++) {
final IndexableAdapter adapter = adapters.get(i);
final IndexableAdapter.NestedColumnMergable mergable = closer.register(
adapter.getNestedColumnMergeables(name)
);
if (mergable == null) {
continue;
}
forceNested = forceNested || mergable.isForceNestedType();
isConstant = isConstant && mergable.isConstant();
constantValue = mergable.getConstantValue();
final SortedValueDictionary dimValues = mergable.getValueDictionary();
boolean allNulls = dimValues == null || dimValues.allNull();
if (!allNulls) {
sortedLookup = dimValues;
mergable.mergeFieldsInto(mergedFields);
sortedLookups[i] = dimValues.getSortedStrings();
sortedLongLookups[i] = dimValues.getSortedLongs();
sortedDoubleLookups[i] = dimValues.getSortedDoubles();
sortedArrayLookups[i] = dimValues.getSortedArrays();
hasArrays = sortedArrayLookups[i].size() > 0;
numMergeIndex++;
}
}
// check to see if we can specialize the serializer after merging all the adapters
final FieldTypeInfo.MutableTypeSet rootTypes = mergedFields.get(NestedPathFinder.JSON_PATH_ROOT);
final boolean rootOnly = mergedFields.size() == 1 && rootTypes != null;
final ColumnType explicitType;
if (castToType != null && (castToType.isPrimitive() || castToType.isPrimitiveArray())) {
explicitType = castToType;
} else {
explicitType = null;
}
// for backwards compat; remove this constant handling in druid 28 along with
// indexSpec.optimizeJsonConstantColumns in favor of always writing constant columns
// we also handle the numMergeIndex == 0 here, which also indicates that the column is a null constant
if (explicitType == null && !forceNested && ((isConstant && constantValue == null) || numMergeIndex == 0)) {
logicalType = ColumnType.STRING;
serializer = new ScalarStringColumnSerializer(
outputName,
indexSpec,
segmentWriteOutMedium,
closer
);
} else if (explicitType != null || (!forceNested && rootOnly && rootTypes.getSingleType() != null)) {
logicalType = explicitType != null ? explicitType : rootTypes.getSingleType();
// empty arrays can be missed since they don't have a type, so handle them here
if (!logicalType.isArray() && hasArrays) {
logicalType = ColumnTypeFactory.getInstance().ofArray(logicalType);
}
switch (logicalType.getType()) {
case LONG:
serializer = new ScalarLongColumnSerializer(
outputName,
indexSpec,
segmentWriteOutMedium,
closer
);
break;
case DOUBLE:
serializer = new ScalarDoubleColumnSerializer(
outputName,
indexSpec,
segmentWriteOutMedium,
closer
);
break;
case STRING:
serializer = new ScalarStringColumnSerializer(
outputName,
indexSpec,
segmentWriteOutMedium,
closer
);
break;
case ARRAY:
serializer = new VariantColumnSerializer(
outputName,
logicalType,
null,
indexSpec,
segmentWriteOutMedium,
closer
);
break;
default:
throw DruidException.defensive(
"How did we get here? Column [%s] with type [%s] does not have specialized serializer",
name,
logicalType
);
}
} else if (!forceNested && rootOnly) {
// mixed type column, but only root path, we can use VariantArrayColumnSerializer
// pick the least restrictive type for the logical type
isVariantType = true;
variantTypeByte = rootTypes.getByteValue();
for (ColumnType type : FieldTypeInfo.convertToSet(rootTypes.getByteValue())) {
logicalType = ColumnType.leastRestrictiveType(logicalType, type);
}
// empty arrays can be missed since they don't have a type, so handle them here
if (!logicalType.isArray() && hasArrays) {
logicalType = ColumnTypeFactory.getInstance().ofArray(logicalType);
}
serializer = new VariantColumnSerializer(
outputName,
null,
variantTypeByte,
indexSpec,
segmentWriteOutMedium,
closer
);
} else {
// all the bells and whistles
logicalType = ColumnType.NESTED_DATA;
serializer = new NestedDataColumnSerializer(
outputName,
indexSpec,
segmentWriteOutMedium,
closer
);
}
serializer.openDictionaryWriter(segmentBaseDir);
serializer.serializeFields(mergedFields);
int stringCardinality;
int longCardinality;
int doubleCardinality;
int arrayCardinality;
if (numMergeIndex == 1) {
serializer.serializeDictionaries(
sortedLookup.getSortedStrings(),
sortedLookup.getSortedLongs(),
sortedLookup.getSortedDoubles(),
() -> new ArrayDictionaryMergingIterator(
sortedArrayLookups,
serializer.getDictionaryIdLookup()
)
);
stringCardinality = sortedLookup.getStringCardinality();
longCardinality = sortedLookup.getLongCardinality();
doubleCardinality = sortedLookup.getDoubleCardinality();
arrayCardinality = sortedLookup.getArrayCardinality();
} else {
final SimpleDictionaryMergingIterator<String> stringIterator = new SimpleDictionaryMergingIterator<>(
sortedLookups,
STRING_MERGING_COMPARATOR
);
final SimpleDictionaryMergingIterator<Long> longIterator = new SimpleDictionaryMergingIterator<>(
sortedLongLookups,
LONG_MERGING_COMPARATOR
);
final SimpleDictionaryMergingIterator<Double> doubleIterator = new SimpleDictionaryMergingIterator<>(
sortedDoubleLookups,
DOUBLE_MERGING_COMPARATOR
);
final ArrayDictionaryMergingIterator arrayIterator = new ArrayDictionaryMergingIterator(
sortedArrayLookups,
serializer.getDictionaryIdLookup()
);
serializer.serializeDictionaries(
() -> stringIterator,
() -> longIterator,
() -> doubleIterator,
() -> arrayIterator
);
stringCardinality = stringIterator.getCardinality();
longCardinality = longIterator.getCardinality();
doubleCardinality = doubleIterator.getCardinality();
arrayCardinality = arrayIterator.getCardinality();
}
// open main serializer after dictionaries have been serialized. we can't do this earlier since we don't know
// dictionary cardinalities until after merging them, and we need to know that to configure compression and such
// which depend on knowing the highest value
serializer.open();
log.debug(
"Completed dim[%s] conversions with string cardinality[%,d], long cardinality[%,d], double cardinality[%,d], array cardinality[%,d] in %,d millis.",
name,
stringCardinality,
longCardinality,
doubleCardinality,
arrayCardinality,
System.currentTimeMillis() - dimStartTime
);
}
catch (IOException ioe) {
log.error(ioe, "Failed to merge dictionary for column [%s]", name);
throw ioe;
}
}