in dataset/src/main/cpp/jni_wrapper.cc [642:708]
JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner(
JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns,
jobject substrait_projection, jobject substrait_filter, jlong batch_size,
jint file_format_id, jobjectArray options, jlong memory_pool_id) {
JNI_METHOD_START
arrow::MemoryPool* pool = reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
if (pool == nullptr) {
JniThrow("Memory pool does not exist or has been closed");
}
std::shared_ptr<arrow::dataset::Dataset> dataset =
RetrieveNativeInstance<arrow::dataset::Dataset>(dataset_id);
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
if (columns != nullptr) {
std::vector<std::string> column_vector = ToStringVector(env, columns);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
if (substrait_projection != nullptr) {
std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
substrait_projection);
std::vector<arrow::compute::Expression> project_exprs;
std::vector<std::string> project_names;
arrow::engine::BoundExpressions bounded_expression =
JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
for(arrow::engine::NamedExpression& named_expression :
bounded_expression.named_expressions) {
project_exprs.push_back(std::move(named_expression.expression));
project_names.push_back(std::move(named_expression.name));
}
JniAssertOkOrThrow(scanner_builder->Project(std::move(project_exprs), std::move(project_names)));
}
if (substrait_filter != nullptr) {
std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
substrait_filter);
std::optional<arrow::compute::Expression> filter_expr = std::nullopt;
arrow::engine::BoundExpressions bounded_expression =
JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
for(arrow::engine::NamedExpression& named_expression :
bounded_expression.named_expressions) {
filter_expr = named_expression.expression;
if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
filter_expr = named_expression.expression;
} else {
JniThrow("There is no filter expression in the expression provided");
}
}
if (filter_expr == std::nullopt) {
JniThrow("The filter expression has not been provided");
}
JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr));
}
if (file_format_id != -1 && options != nullptr) {
std::unordered_map<std::string, std::string> option_map = ToStringMap(env, options);
std::shared_ptr<arrow::dataset::FragmentScanOptions> scan_options =
JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map));
JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options));
}
JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size));
auto scanner = JniGetOrThrow(scanner_builder->Finish());
std::shared_ptr<DisposableScannerAdaptor> scanner_adaptor =
JniGetOrThrow(DisposableScannerAdaptor::Create(scanner));
jlong id = CreateNativeRef(scanner_adaptor);
return id;
JNI_METHOD_END(-1L)
}