in pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ArrowResponseEncoder.java [71:381]
private VectorSchemaRoot createVectorSchemaRoot(ResultTable resultTable, DataSchema schema, int startRow,
int length) {
List<Field> fields = new ArrayList<>();
List<FieldVector> vectors = new ArrayList<>();
int numColumns = schema.getColumnNames().length; // assuming getter returns String[]
// Create an Arrow Field and vector for each column based on its type.
for (int col = 0; col < numColumns; col++) {
String colName = schema.getColumnNames()[col];
DataSchema.ColumnDataType colType = schema.getColumnDataTypes()[col];
Field field;
FieldVector vector;
switch (colType) {
case BOOLEAN:
field = new Field(colName, FieldType.nullable(new ArrowType.Bool()), null);
vector = new org.apache.arrow.vector.BitVector(colName, ALLOCATOR);
break;
case INT:
field = new Field(colName, FieldType.nullable(new ArrowType.Int(32, true)), null);
vector = new IntVector(colName, ALLOCATOR);
break;
case LONG:
field = new Field(colName, FieldType.nullable(new ArrowType.Int(64, true)), null);
vector = new BigIntVector(colName, ALLOCATOR);
break;
case FLOAT:
field =
new Field(colName, FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null);
vector = new Float4Vector(colName, ALLOCATOR);
break;
case DOUBLE:
field =
new Field(colName, FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null);
vector = new Float8Vector(colName, ALLOCATOR);
break;
case TIMESTAMP:
case STRING:
case BYTES:
case BIG_DECIMAL:
case JSON:
case OBJECT:
field = new Field(colName, FieldType.nullable(new ArrowType.Utf8()), null);
vector = new VarCharVector(colName, ALLOCATOR);
break;
case MAP:
field = new Field(colName, FieldType.nullable(new ArrowType.Binary()), null);
vector = new VarBinaryVector(colName, ALLOCATOR);
break;
case UNKNOWN:
// handle Null value
field = new Field(colName, FieldType.nullable(new ArrowType.Null()), null);
vector = new NullVector(colName);
break;
case BOOLEAN_ARRAY:
// Define the inner field for a boolean element.
List<Field> children =
Collections.singletonList(new Field("element", FieldType.nullable(new ArrowType.Bool()), null));
// Define the field for the list column.
field = new Field(colName, FieldType.nullable(new ArrowType.List()), children);
// Create a ListVector using the field name and allocator.
vector = ListVector.empty(colName, ALLOCATOR);
// Initialize its children from the defined field.
vector.initializeChildrenFromFields(children);
break;
case INT_ARRAY:
// Define the inner field for an int element.
children =
Collections.singletonList(new Field("element", FieldType.nullable(new ArrowType.Int(32, true)), null));
// Define the field for the list column.
field = new Field(colName, FieldType.nullable(new ArrowType.List()), children);
// Create a ListVector using the field name and allocator.
vector = ListVector.empty(colName, ALLOCATOR);
// Initialize its children from the defined field.
vector.initializeChildrenFromFields(children);
break;
case LONG_ARRAY:
// Define the inner field for a long element.
children =
Collections.singletonList(new Field("element", FieldType.nullable(new ArrowType.Int(64, true)), null));
// Define the field for the list column.
field = new Field(colName, FieldType.nullable(new ArrowType.List()), children);
// Create a ListVector using the field name and allocator.
vector = ListVector.empty(colName, ALLOCATOR);
// Initialize its children from the defined field.
vector.initializeChildrenFromFields(children);
break;
case FLOAT_ARRAY:
// Define the inner field for a float element.
children = Collections.singletonList(
new Field("element", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)),
null));
// Define the field for the list column.
field = new Field(colName, FieldType.nullable(new ArrowType.List()), children);
// Create a ListVector using the field name and allocator.
vector = ListVector.empty(colName, ALLOCATOR);
// Initialize its children from the defined field.
vector.initializeChildrenFromFields(children);
break;
case DOUBLE_ARRAY:
// Define the inner field for a double element.
children = Collections.singletonList(
new Field("element", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
null));
// Define the field for the list column.
field = new Field(colName, FieldType.nullable(new ArrowType.List()), children);
// Create a ListVector using the field name and allocator.
vector = ListVector.empty(colName, ALLOCATOR);
// Initialize its children from the defined field.
vector.initializeChildrenFromFields(children);
break;
case TIMESTAMP_ARRAY:
case STRING_ARRAY:
case BYTES_ARRAY:
// Define the inner field for a string element.
children = Collections.singletonList(new Field("element", FieldType.nullable(new ArrowType.Utf8()), null));
// Define the field for the list column.
field = new Field(colName, FieldType.nullable(new ArrowType.List()), children);
// Create a ListVector using the field name and allocator.
vector = ListVector.empty(colName, ALLOCATOR);
// Initialize its children from the defined field.
vector.initializeChildrenFromFields(children);
break;
default:
throw new UnsupportedOperationException("Unsupported column type: " + colType);
}
fields.add(field);
vector.allocateNew();
vectors.add(vector);
}
// Determine the actual end row.
int availableRows = resultTable.getRows().size();
int endRow = Math.min(startRow + length, availableRows);
// Write the rows from startRow to endRow into the corresponding Arrow vectors.
for (int i = startRow; i < endRow; i++) {
Object[] row = resultTable.getRows().get(i);
int rowIndex = i - startRow;
for (int col = 0; col < numColumns; col++) {
FieldVector vector = vectors.get(col);
Object value = row[col];
if (value == null) {
vector.setNull(rowIndex);
} else {
DataSchema.ColumnDataType colType = schema.getColumnDataTypes()[col];
switch (colType) {
case BOOLEAN:
((org.apache.arrow.vector.BitVector) vector).setSafe(rowIndex, ((Boolean) value) ? 1 : 0);
break;
case INT:
((IntVector) vector).setSafe(rowIndex, ((Number) value).intValue());
break;
case LONG:
((BigIntVector) vector).setSafe(rowIndex, ((Number) value).longValue());
break;
case FLOAT:
((Float4Vector) vector).setSafe(rowIndex, ((Number) value).floatValue());
break;
case DOUBLE:
((Float8Vector) vector).setSafe(rowIndex, ((Number) value).doubleValue());
break;
case TIMESTAMP:
case STRING:
case BYTES:
case BIG_DECIMAL:
case JSON:
case OBJECT:
byte[] bytes = ((String) value).getBytes(StandardCharsets.UTF_8);
((VarCharVector) vector).setSafe(rowIndex, bytes);
break;
case MAP:
byte[] mapValueBytes = MapUtils.serializeMap((Map) value);
((VarBinaryVector) vector).setSafe(rowIndex, mapValueBytes);
break;
case UNKNOWN:
// Handle null value
vector.setNull(rowIndex);
break;
case BOOLEAN_ARRAY:
ListVector booleanArrayVector = (ListVector) vector;
boolean[] booleanArray = (boolean[]) value;
// Start a new list entry for the current row.
booleanArrayVector.startNewValue(rowIndex);
// Retrieve the underlying child vector (BitVector).
BitVector bitVector = (BitVector) booleanArrayVector.getDataVector();
// Determine the current position in the child vector.
int bitVectorIndex = bitVector.getValueCount();
// Write each boolean into the child vector.
for (boolean v : booleanArray) {
bitVector.setSafe(bitVectorIndex, v ? 1 : 0);
bitVectorIndex++;
}
// Update the child vector's value count.
bitVector.setValueCount(bitVectorIndex);
// Finalize the list entry by indicating the number of elements added.
booleanArrayVector.endValue(rowIndex, booleanArray.length);
break;
case INT_ARRAY:
ListVector intArrayVector = (ListVector) vector;
int[] intArr = (int[]) value;
// Start a new list entry for the current row.
intArrayVector.startNewValue(rowIndex);
// Retrieve the underlying child vector (IntVector).
IntVector intVector = (IntVector) intArrayVector.getDataVector();
// Determine the current position in the child vector.
int intVectorIndex = intVector.getValueCount();
// Write each integer into the child vector.
for (int v : intArr) {
intVector.setSafe(intVectorIndex, v);
intVectorIndex++;
}
// Update the child vector's value count.
intVector.setValueCount(intVectorIndex);
// Finalize the list entry by indicating the number of elements added.
intArrayVector.endValue(rowIndex, intArr.length);
break;
case LONG_ARRAY:
ListVector longArrayVector = (ListVector) vector;
long[] longArray = (long[]) value;
// Start a new list entry for the current row.
longArrayVector.startNewValue(rowIndex);
// Retrieve the underlying child vector (BigIntVector).
BigIntVector bigIntVector = (BigIntVector) longArrayVector.getDataVector();
// Determine the current position in the child vector.
int bigIntVectorIndex = bigIntVector.getValueCount();
// Write each long into the child vector.
for (long v : longArray) {
bigIntVector.setSafe(bigIntVectorIndex, v);
bigIntVectorIndex++;
}
// Update the child vector's value count.
bigIntVector.setValueCount(bigIntVectorIndex);
// Finalize the list entry by indicating the number of elements added.
longArrayVector.endValue(rowIndex, longArray.length);
break;
case FLOAT_ARRAY:
ListVector floatArrayVector = (ListVector) vector;
float[] floatArray = (float[]) value;
// Start a new list entry for the current row.
floatArrayVector.startNewValue(rowIndex);
// Retrieve the underlying child vector (Float4Vector).
Float4Vector floatVector = (Float4Vector) floatArrayVector.getDataVector();
// Determine the current position in the child vector.
int floatVectorIndex = floatVector.getValueCount();
// Write each float into the child vector.
for (float v : floatArray) {
floatVector.setSafe(floatVectorIndex, v);
floatVectorIndex++;
}
// Update the child vector's value count.
floatVector.setValueCount(floatVectorIndex);
// Finalize the list entry by indicating the number of elements added.
floatArrayVector.endValue(rowIndex, floatArray.length);
break;
case DOUBLE_ARRAY:
ListVector doubleArrayVector = (ListVector) vector;
double[] doubleArray = (double[]) value;
// Start a new list entry for the current row.
doubleArrayVector.startNewValue(rowIndex);
// Retrieve the underlying child vector (Float8Vector).
Float8Vector doubleVector = (Float8Vector) doubleArrayVector.getDataVector();
// Determine the current position in the child vector.
int doubleVectorIndex = doubleVector.getValueCount();
// Write each double into the child vector.
for (double v : doubleArray) {
doubleVector.setSafe(doubleVectorIndex, v);
doubleVectorIndex++;
}
// Update the child vector's value count.
doubleVector.setValueCount(doubleVectorIndex);
// Finalize the list entry by indicating the number of elements added.
doubleArrayVector.endValue(rowIndex, doubleArray.length);
break;
case TIMESTAMP_ARRAY:
case STRING_ARRAY:
case BYTES_ARRAY:
ListVector listVector = (ListVector) vector;
String[] stringArray = (String[]) value;
// Start a new list entry for the current row.
listVector.startNewValue(rowIndex);
// Retrieve the underlying child vector (VarCharVector).
VarCharVector stringVector = (VarCharVector) listVector.getDataVector();
// Determine the current position in the child vector.
int stringVectorIndex = stringVector.getValueCount();
// Write each string into the child vector.
for (String v : stringArray) {
byte[] stringBytes = v.getBytes(StandardCharsets.UTF_8);
stringVector.setSafe(stringVectorIndex, stringBytes);
stringVectorIndex++;
}
// Update the child vector's value count.
stringVector.setValueCount(stringVectorIndex);
// Finalize the list entry by indicating the number of elements added.
listVector.endValue(rowIndex, stringArray.length);
break;
default:
throw new UnsupportedOperationException("Unsupported column type: " + colType);
}
}
}
}
// Set the value count on each vector.
int numRows = endRow - startRow;
for (FieldVector vector : vectors) {
vector.setValueCount(numRows);
}
Schema arrowSchema = new Schema(fields);
return new VectorSchemaRoot(arrowSchema, vectors, numRows);
}