fn get_next()

in native/core/src/execution/operators/scan.rs [199:293]


    fn get_next(
        exec_context_id: i64,
        iter: &JObject,
        num_cols: usize,
        jvm_fetch_time: &Time,
        arrow_ffi_time: &Time,
    ) -> Result<InputBatch, CometError> {
        if exec_context_id == TEST_EXEC_CONTEXT_ID {
            // This is a unit test. We don't need to call JNI.
            return Ok(InputBatch::EOF);
        }

        if iter.is_null() {
            return Err(CometError::from(ExecutionError::GeneralError(format!(
                "Null batch iterator object. Plan id: {}",
                exec_context_id
            ))));
        }

        let mut env = JVMClasses::get_env()?;

        let mut timer = jvm_fetch_time.timer();

        let num_rows: i32 = unsafe {
            jni_call!(&mut env,
        comet_batch_iterator(iter).has_next() -> i32)?
        };

        timer.stop();

        if num_rows == -1 {
            return Ok(InputBatch::EOF);
        }

        let mut timer = arrow_ffi_time.timer();

        let mut array_addrs = Vec::with_capacity(num_cols);
        let mut schema_addrs = Vec::with_capacity(num_cols);

        for _ in 0..num_cols {
            let arrow_array = Rc::new(FFI_ArrowArray::empty());
            let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
            let (array_ptr, schema_ptr) = (
                Rc::into_raw(arrow_array) as i64,
                Rc::into_raw(arrow_schema) as i64,
            );

            array_addrs.push(array_ptr);
            schema_addrs.push(schema_ptr);
        }

        // Prepare the java array parameters
        let long_array_addrs = env.new_long_array(num_cols as jsize)?;
        let long_schema_addrs = env.new_long_array(num_cols as jsize)?;

        env.set_long_array_region(&long_array_addrs, 0, &array_addrs)?;
        env.set_long_array_region(&long_schema_addrs, 0, &schema_addrs)?;

        let array_obj = JObject::from(long_array_addrs);
        let schema_obj = JObject::from(long_schema_addrs);

        let array_obj = JValueGen::Object(array_obj.as_ref());
        let schema_obj = JValueGen::Object(schema_obj.as_ref());

        let num_rows: i32 = unsafe {
            jni_call!(&mut env,
        comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)?
        };

        // we already checked for end of results on call to has_next() so should always
        // have a valid row count when calling next()
        assert!(num_rows != -1);

        let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols);

        for i in 0..num_cols {
            let array_ptr = array_addrs[i];
            let schema_ptr = schema_addrs[i];
            let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;

            // TODO: validate array input data

            inputs.push(make_array(array_data));

            // Drop the Arcs to avoid memory leak
            unsafe {
                Rc::from_raw(array_ptr as *const FFI_ArrowArray);
                Rc::from_raw(schema_ptr as *const FFI_ArrowSchema);
            }
        }

        timer.stop();

        Ok(InputBatch::new(inputs, Some(num_rows as usize)))
    }