private static void serializeColumnData()

in pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java [245:456]


  private static void serializeColumnData(List<Object[]> columns, DataSchema dataSchema, int colId,
      ByteBuffer fixedSize, PagedPinotOutputStream varSize, RoaringBitmap nullBitmap,
      Object2IntOpenHashMap<String> dictionary, @Nullable AggregationFunction aggFunction)
      throws IOException {
    ColumnDataType storedType = dataSchema.getColumnDataType(colId).getStoredType();
    int numRows = columns.get(colId).length;

    Object[] column = columns.get(colId);

    // NOTE:
    // We intentionally make the type casting very strict here (e.g. only accepting Integer for INT) to ensure the
    // rows conform to the data schema. This can help catch the unexpected data type issues early.
    switch (storedType) {
      // Single-value column
      case INT: {
        int nullPlaceholder = (int) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            fixedSize.putInt(nullPlaceholder);
          } else {
            fixedSize.putInt((int) value);
          }
        }
        break;
      }
      case LONG: {
        long nullPlaceholder = (long) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            fixedSize.putLong(nullPlaceholder);
          } else {
            fixedSize.putLong((long) value);
          }
        }
        break;
      }
      case FLOAT: {
        float nullPlaceholder = (float) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            fixedSize.putFloat(nullPlaceholder);
          } else {
            fixedSize.putFloat((float) value);
          }
        }
        break;
      }
      case DOUBLE: {
        double nullPlaceholder = (double) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            fixedSize.putDouble(nullPlaceholder);
          } else {
            fixedSize.putDouble((double) value);
          }
        }
        break;
      }
      case BIG_DECIMAL: {
        BigDecimal nullPlaceholder = (BigDecimal) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            setColumn(fixedSize, varSize, nullPlaceholder);
          } else {
            setColumn(fixedSize, varSize, (BigDecimal) value);
          }
        }
        break;
      }
      case STRING: {
        ToIntFunction<String> didSupplier = k -> dictionary.size();
        int nullPlaceHolder = dictionary.computeIfAbsent((String) storedType.getNullPlaceholder(), didSupplier);

        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            fixedSize.putInt(nullPlaceHolder);
          } else {
            int dictId = dictionary.computeIfAbsent((String) value, didSupplier);
            fixedSize.putInt(dictId);
          }
        }
        break;
      }
      case BYTES: {
        ByteArray nullPlaceholder = (ByteArray) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            setColumn(fixedSize, varSize, nullPlaceholder);
          } else {
            setColumn(fixedSize, varSize, (ByteArray) value);
          }
        }
        break;
      }
      case MAP: {
        Map nullPlaceholder = (Map) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            setColumn(fixedSize, varSize, nullPlaceholder);
          } else {
            setColumn(fixedSize, varSize, (Map) value);
          }
        }
        break;
      }
      // Multi-value column
      case INT_ARRAY: {
        int[] nullPlaceholder = (int[]) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            setColumn(fixedSize, varSize, nullPlaceholder);
          } else {
            setColumn(fixedSize, varSize, (int[]) value);
          }
        }
        break;
      }
      case LONG_ARRAY: {
        long[] nullPlaceholder = (long[]) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            setColumn(fixedSize, varSize, nullPlaceholder);
          } else {
            setColumn(fixedSize, varSize, (long[]) value);
          }
        }
        break;
      }
      case FLOAT_ARRAY: {
        float[] nullPlaceholder = (float[]) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            setColumn(fixedSize, varSize, nullPlaceholder);
          } else {
            setColumn(fixedSize, varSize, (float[]) value);
          }
        }
        break;
      }
      case DOUBLE_ARRAY: {
        double[] nullPlaceholder = (double[]) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            setColumn(fixedSize, varSize, nullPlaceholder);
          } else {
            setColumn(fixedSize, varSize, (double[]) value);
          }
        }
        break;
      }
      case STRING_ARRAY: {
        String[] nullPlaceholder = (String[]) storedType.getNullPlaceholder();
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            nullBitmap.add(rowId);
            setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
          } else {
            setColumn(fixedSize, varSize, (String[]) value, dictionary);
          }
        }
        break;
      }
      // Custom intermediate result for aggregation function
      case OBJECT: {
        assert aggFunction != null;
        for (int rowId = 0; rowId < numRows; rowId++) {
          Object value = column[rowId];
          if (value == null) {
            setNull(fixedSize, varSize);
          } else {
            setColumn(fixedSize, varSize, aggFunction.serializeIntermediateResult(value));
          }
        }
        break;
      }
      // Null
      case UNKNOWN:
        for (int rowId = 0; rowId < numRows; rowId++) {
          setNull(fixedSize, varSize);
        }
        break;

      default:
        throw new IllegalStateException(
            "Unsupported stored type: " + storedType + " for column: " + dataSchema.getColumnName(colId));
    }
  }