public void writeMergedValueDictionary()

in processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java [126:341]


  public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws IOException
  {
    try {
      long dimStartTime = System.currentTimeMillis();

      int numMergeIndex = 0;
      SortedValueDictionary sortedLookup = null;
      final Indexed[] sortedLookups = new Indexed[adapters.size()];
      final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
      final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
      final Indexed<Object[]>[] sortedArrayLookups = new Indexed[adapters.size()];

      final SortedMap<String, FieldTypeInfo.MutableTypeSet> mergedFields = new TreeMap<>();

      boolean forceNested = false;
      Object constantValue = null;
      boolean hasArrays = false;
      boolean isConstant = true;

      for (int i = 0; i < adapters.size(); i++) {
        final IndexableAdapter adapter = adapters.get(i);
        final IndexableAdapter.NestedColumnMergable mergable = closer.register(
            adapter.getNestedColumnMergeables(name)
        );
        if (mergable == null) {
          continue;
        }
        forceNested = forceNested || mergable.isForceNestedType();
        isConstant = isConstant && mergable.isConstant();
        constantValue = mergable.getConstantValue();

        final SortedValueDictionary dimValues = mergable.getValueDictionary();

        boolean allNulls = dimValues == null || dimValues.allNull();
        if (!allNulls) {
          sortedLookup = dimValues;
          mergable.mergeFieldsInto(mergedFields);
          sortedLookups[i] = dimValues.getSortedStrings();
          sortedLongLookups[i] = dimValues.getSortedLongs();
          sortedDoubleLookups[i] = dimValues.getSortedDoubles();
          sortedArrayLookups[i] = dimValues.getSortedArrays();
          hasArrays = sortedArrayLookups[i].size() > 0;
          numMergeIndex++;
        }
      }

      // check to see if we can specialize the serializer after merging all the adapters
      final FieldTypeInfo.MutableTypeSet rootTypes = mergedFields.get(NestedPathFinder.JSON_PATH_ROOT);
      final boolean rootOnly = mergedFields.size() == 1 && rootTypes != null;

      final ColumnType explicitType;
      if (castToType != null && (castToType.isPrimitive() || castToType.isPrimitiveArray())) {
        explicitType = castToType;
      } else {
        explicitType = null;
      }

      // for backwards compat; remove this constant handling in druid 28 along with
      // indexSpec.optimizeJsonConstantColumns in favor of always writing constant columns
      // we also handle the numMergeIndex == 0 here, which also indicates that the column is a null constant
      if (explicitType == null && !forceNested && ((isConstant && constantValue == null) || numMergeIndex == 0)) {
        logicalType = ColumnType.STRING;
        serializer = new ScalarStringColumnSerializer(
            outputName,
            indexSpec,
            segmentWriteOutMedium,
            closer
        );
      } else if (explicitType != null || (!forceNested && rootOnly && rootTypes.getSingleType() != null)) {
        logicalType = explicitType != null ? explicitType : rootTypes.getSingleType();
        // empty arrays can be missed since they don't have a type, so handle them here
        if (!logicalType.isArray() && hasArrays) {
          logicalType = ColumnTypeFactory.getInstance().ofArray(logicalType);
        }
        switch (logicalType.getType()) {
          case LONG:
            serializer = new ScalarLongColumnSerializer(
                outputName,
                indexSpec,
                segmentWriteOutMedium,
                closer
            );
            break;
          case DOUBLE:
            serializer = new ScalarDoubleColumnSerializer(
                outputName,
                indexSpec,
                segmentWriteOutMedium,
                closer
            );
            break;
          case STRING:
            serializer = new ScalarStringColumnSerializer(
                outputName,
                indexSpec,
                segmentWriteOutMedium,
                closer
            );
            break;
          case ARRAY:
            serializer = new VariantColumnSerializer(
                outputName,
                logicalType,
                null,
                indexSpec,
                segmentWriteOutMedium,
                closer
            );
            break;
          default:
            throw DruidException.defensive(
                "How did we get here? Column [%s] with type [%s] does not have specialized serializer",
                name,
                logicalType
            );
        }
      } else if (!forceNested && rootOnly) {
        // mixed type column, but only root path, we can use VariantArrayColumnSerializer
        // pick the least restrictive type for the logical type
        isVariantType = true;
        variantTypeByte = rootTypes.getByteValue();
        for (ColumnType type : FieldTypeInfo.convertToSet(rootTypes.getByteValue())) {
          logicalType = ColumnType.leastRestrictiveType(logicalType, type);
        }
        // empty arrays can be missed since they don't have a type, so handle them here
        if (!logicalType.isArray() && hasArrays) {
          logicalType = ColumnTypeFactory.getInstance().ofArray(logicalType);
        }
        serializer = new VariantColumnSerializer(
            outputName,
            null,
            variantTypeByte,
            indexSpec,
            segmentWriteOutMedium,
            closer
        );
      } else {
        // all the bells and whistles
        logicalType = ColumnType.NESTED_DATA;
        serializer = new NestedDataColumnSerializer(
            outputName,
            indexSpec,
            segmentWriteOutMedium,
            closer
        );
      }

      serializer.openDictionaryWriter(segmentBaseDir);
      serializer.serializeFields(mergedFields);

      int stringCardinality;
      int longCardinality;
      int doubleCardinality;
      int arrayCardinality;
      if (numMergeIndex == 1) {
        serializer.serializeDictionaries(
            sortedLookup.getSortedStrings(),
            sortedLookup.getSortedLongs(),
            sortedLookup.getSortedDoubles(),
            () -> new ArrayDictionaryMergingIterator(
                sortedArrayLookups,
                serializer.getDictionaryIdLookup()
            )
        );
        stringCardinality = sortedLookup.getStringCardinality();
        longCardinality = sortedLookup.getLongCardinality();
        doubleCardinality = sortedLookup.getDoubleCardinality();
        arrayCardinality = sortedLookup.getArrayCardinality();
      } else {
        final SimpleDictionaryMergingIterator<String> stringIterator = new SimpleDictionaryMergingIterator<>(
            sortedLookups,
            STRING_MERGING_COMPARATOR
        );
        final SimpleDictionaryMergingIterator<Long> longIterator = new SimpleDictionaryMergingIterator<>(
            sortedLongLookups,
            LONG_MERGING_COMPARATOR
        );
        final SimpleDictionaryMergingIterator<Double> doubleIterator = new SimpleDictionaryMergingIterator<>(
            sortedDoubleLookups,
            DOUBLE_MERGING_COMPARATOR
        );
        final ArrayDictionaryMergingIterator arrayIterator = new ArrayDictionaryMergingIterator(
            sortedArrayLookups,
            serializer.getDictionaryIdLookup()
        );
        serializer.serializeDictionaries(
            () -> stringIterator,
            () -> longIterator,
            () -> doubleIterator,
            () -> arrayIterator
        );
        stringCardinality = stringIterator.getCardinality();
        longCardinality = longIterator.getCardinality();
        doubleCardinality = doubleIterator.getCardinality();
        arrayCardinality = arrayIterator.getCardinality();
      }
      // open main serializer after dictionaries have been serialized. we can't do this earlier since we don't know
      // dictionary cardinalities until after merging them, and we need to know that to configure compression and such
      // which depend on knowing the highest value
      serializer.open();

      log.debug(
          "Completed dim[%s] conversions with string cardinality[%,d], long cardinality[%,d], double cardinality[%,d], array cardinality[%,d] in %,d millis.",
          name,
          stringCardinality,
          longCardinality,
          doubleCardinality,
          arrayCardinality,
          System.currentTimeMillis() - dimStartTime
      );
    }
    catch (IOException ioe) {
      log.error(ioe, "Failed to merge dictionary for column [%s]", name);
      throw ioe;
    }
  }