in pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java [245:456]
private static void serializeColumnData(List<Object[]> columns, DataSchema dataSchema, int colId,
ByteBuffer fixedSize, PagedPinotOutputStream varSize, RoaringBitmap nullBitmap,
Object2IntOpenHashMap<String> dictionary, @Nullable AggregationFunction aggFunction)
throws IOException {
ColumnDataType storedType = dataSchema.getColumnDataType(colId).getStoredType();
int numRows = columns.get(colId).length;
Object[] column = columns.get(colId);
// NOTE:
// We intentionally make the type casting very strict here (e.g. only accepting Integer for INT) to ensure the
// rows conform to the data schema. This can help catch the unexpected data type issues early.
switch (storedType) {
// Single-value column
case INT: {
int nullPlaceholder = (int) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
fixedSize.putInt(nullPlaceholder);
} else {
fixedSize.putInt((int) value);
}
}
break;
}
case LONG: {
long nullPlaceholder = (long) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
fixedSize.putLong(nullPlaceholder);
} else {
fixedSize.putLong((long) value);
}
}
break;
}
case FLOAT: {
float nullPlaceholder = (float) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
fixedSize.putFloat(nullPlaceholder);
} else {
fixedSize.putFloat((float) value);
}
}
break;
}
case DOUBLE: {
double nullPlaceholder = (double) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
fixedSize.putDouble(nullPlaceholder);
} else {
fixedSize.putDouble((double) value);
}
}
break;
}
case BIG_DECIMAL: {
BigDecimal nullPlaceholder = (BigDecimal) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
setColumn(fixedSize, varSize, nullPlaceholder);
} else {
setColumn(fixedSize, varSize, (BigDecimal) value);
}
}
break;
}
case STRING: {
ToIntFunction<String> didSupplier = k -> dictionary.size();
int nullPlaceHolder = dictionary.computeIfAbsent((String) storedType.getNullPlaceholder(), didSupplier);
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
fixedSize.putInt(nullPlaceHolder);
} else {
int dictId = dictionary.computeIfAbsent((String) value, didSupplier);
fixedSize.putInt(dictId);
}
}
break;
}
case BYTES: {
ByteArray nullPlaceholder = (ByteArray) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
setColumn(fixedSize, varSize, nullPlaceholder);
} else {
setColumn(fixedSize, varSize, (ByteArray) value);
}
}
break;
}
case MAP: {
Map nullPlaceholder = (Map) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
setColumn(fixedSize, varSize, nullPlaceholder);
} else {
setColumn(fixedSize, varSize, (Map) value);
}
}
break;
}
// Multi-value column
case INT_ARRAY: {
int[] nullPlaceholder = (int[]) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
setColumn(fixedSize, varSize, nullPlaceholder);
} else {
setColumn(fixedSize, varSize, (int[]) value);
}
}
break;
}
case LONG_ARRAY: {
long[] nullPlaceholder = (long[]) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
setColumn(fixedSize, varSize, nullPlaceholder);
} else {
setColumn(fixedSize, varSize, (long[]) value);
}
}
break;
}
case FLOAT_ARRAY: {
float[] nullPlaceholder = (float[]) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
setColumn(fixedSize, varSize, nullPlaceholder);
} else {
setColumn(fixedSize, varSize, (float[]) value);
}
}
break;
}
case DOUBLE_ARRAY: {
double[] nullPlaceholder = (double[]) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
setColumn(fixedSize, varSize, nullPlaceholder);
} else {
setColumn(fixedSize, varSize, (double[]) value);
}
}
break;
}
case STRING_ARRAY: {
String[] nullPlaceholder = (String[]) storedType.getNullPlaceholder();
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
nullBitmap.add(rowId);
setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
} else {
setColumn(fixedSize, varSize, (String[]) value, dictionary);
}
}
break;
}
// Custom intermediate result for aggregation function
case OBJECT: {
assert aggFunction != null;
for (int rowId = 0; rowId < numRows; rowId++) {
Object value = column[rowId];
if (value == null) {
setNull(fixedSize, varSize);
} else {
setColumn(fixedSize, varSize, aggFunction.serializeIntermediateResult(value));
}
}
break;
}
// Null
case UNKNOWN:
for (int rowId = 0; rowId < numRows; rowId++) {
setNull(fixedSize, varSize);
}
break;
default:
throw new IllegalStateException(
"Unsupported stored type: " + storedType + " for column: " + dataSchema.getColumnName(colId));
}
}