in native/core/src/execution/operators/scan.rs [199:293]
fn get_next(
exec_context_id: i64,
iter: &JObject,
num_cols: usize,
jvm_fetch_time: &Time,
arrow_ffi_time: &Time,
) -> Result<InputBatch, CometError> {
if exec_context_id == TEST_EXEC_CONTEXT_ID {
// This is a unit test. We don't need to call JNI.
return Ok(InputBatch::EOF);
}
if iter.is_null() {
return Err(CometError::from(ExecutionError::GeneralError(format!(
"Null batch iterator object. Plan id: {}",
exec_context_id
))));
}
let mut env = JVMClasses::get_env()?;
let mut timer = jvm_fetch_time.timer();
let num_rows: i32 = unsafe {
jni_call!(&mut env,
comet_batch_iterator(iter).has_next() -> i32)?
};
timer.stop();
if num_rows == -1 {
return Ok(InputBatch::EOF);
}
let mut timer = arrow_ffi_time.timer();
let mut array_addrs = Vec::with_capacity(num_cols);
let mut schema_addrs = Vec::with_capacity(num_cols);
for _ in 0..num_cols {
let arrow_array = Rc::new(FFI_ArrowArray::empty());
let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
let (array_ptr, schema_ptr) = (
Rc::into_raw(arrow_array) as i64,
Rc::into_raw(arrow_schema) as i64,
);
array_addrs.push(array_ptr);
schema_addrs.push(schema_ptr);
}
// Prepare the java array parameters
let long_array_addrs = env.new_long_array(num_cols as jsize)?;
let long_schema_addrs = env.new_long_array(num_cols as jsize)?;
env.set_long_array_region(&long_array_addrs, 0, &array_addrs)?;
env.set_long_array_region(&long_schema_addrs, 0, &schema_addrs)?;
let array_obj = JObject::from(long_array_addrs);
let schema_obj = JObject::from(long_schema_addrs);
let array_obj = JValueGen::Object(array_obj.as_ref());
let schema_obj = JValueGen::Object(schema_obj.as_ref());
let num_rows: i32 = unsafe {
jni_call!(&mut env,
comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)?
};
// we already checked for end of results on call to has_next() so should always
// have a valid row count when calling next()
assert!(num_rows != -1);
let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols);
for i in 0..num_cols {
let array_ptr = array_addrs[i];
let schema_ptr = schema_addrs[i];
let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;
// TODO: validate array input data
inputs.push(make_array(array_data));
// Drop the Arcs to avoid memory leak
unsafe {
Rc::from_raw(array_ptr as *const FFI_ArrowArray);
Rc::from_raw(schema_ptr as *const FFI_ArrowSchema);
}
}
timer.stop();
Ok(InputBatch::new(inputs, Some(num_rows as usize)))
}