in native/core/src/execution/expressions/subquery.rs [82:190]
fn evaluate(&self, _: &RecordBatch) -> datafusion::common::Result<ColumnarValue> {
let mut env = JVMClasses::get_env()?;
unsafe {
let is_null = jni_static_call!(&mut env,
comet_exec.is_null(self.exec_context_id, self.id) -> jboolean
)?;
if is_null > 0 {
return Ok(ColumnarValue::Scalar(ScalarValue::try_from(
&self.data_type,
)?));
}
match &self.data_type {
DataType::Boolean => {
let r = jni_static_call!(&mut env,
comet_exec.get_bool(self.exec_context_id, self.id) -> jboolean
)?;
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(r > 0))))
}
DataType::Int8 => {
let r = jni_static_call!(&mut env,
comet_exec.get_byte(self.exec_context_id, self.id) -> jbyte
)?;
Ok(ColumnarValue::Scalar(ScalarValue::Int8(Some(r))))
}
DataType::Int16 => {
let r = jni_static_call!(&mut env,
comet_exec.get_short(self.exec_context_id, self.id) -> jshort
)?;
Ok(ColumnarValue::Scalar(ScalarValue::Int16(Some(r))))
}
DataType::Int32 => {
let r = jni_static_call!(&mut env,
comet_exec.get_int(self.exec_context_id, self.id) -> jint
)?;
Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(r))))
}
DataType::Int64 => {
let r = jni_static_call!(&mut env,
comet_exec.get_long(self.exec_context_id, self.id) -> jlong
)?;
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(r))))
}
DataType::Float32 => {
let r = jni_static_call!(&mut env,
comet_exec.get_float(self.exec_context_id, self.id) -> f32
)?;
Ok(ColumnarValue::Scalar(ScalarValue::Float32(Some(r))))
}
DataType::Float64 => {
let r = jni_static_call!(&mut env,
comet_exec.get_double(self.exec_context_id, self.id) -> f64
)?;
Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some(r))))
}
DataType::Decimal128(p, s) => {
let bytes = jni_static_call!(&mut env,
comet_exec.get_decimal(self.exec_context_id, self.id) -> BinaryWrapper
)?;
let bytes: &JByteArray = bytes.get().into();
let slice = env.convert_byte_array(bytes).unwrap();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal128(
Some(bytes_to_i128(&slice)),
*p,
*s,
)))
}
DataType::Date32 => {
let r = jni_static_call!(&mut env,
comet_exec.get_int(self.exec_context_id, self.id) -> jint
)?;
Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some(r))))
}
DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
let r = jni_static_call!(&mut env,
comet_exec.get_long(self.exec_context_id, self.id) -> jlong
)?;
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
Some(r),
timezone.clone(),
)))
}
DataType::Utf8 => {
let string = jni_static_call!(&mut env,
comet_exec.get_string(self.exec_context_id, self.id) -> StringWrapper
)?;
let string = env.get_string(string.get()).unwrap().into();
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(string))))
}
DataType::Binary => {
let bytes = jni_static_call!(&mut env,
comet_exec.get_binary(self.exec_context_id, self.id) -> BinaryWrapper
)?;
let bytes: &JByteArray = bytes.get().into();
let slice = env.convert_byte_array(bytes).unwrap();
Ok(ColumnarValue::Scalar(ScalarValue::Binary(Some(slice))))
}
_ => internal_err!("Unsupported scalar subquery data type {:?}", self.data_type),
}
}
}