JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner()

in dataset/src/main/cpp/jni_wrapper.cc [642:708]


JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner(
    JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns,
    jobject substrait_projection, jobject substrait_filter, jlong batch_size,
    jint file_format_id, jobjectArray options, jlong memory_pool_id) {
  JNI_METHOD_START
  arrow::MemoryPool* pool = reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
  if (pool == nullptr) {
    JniThrow("Memory pool does not exist or has been closed");
  }
  std::shared_ptr<arrow::dataset::Dataset> dataset =
      RetrieveNativeInstance<arrow::dataset::Dataset>(dataset_id);
  std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
      JniGetOrThrow(dataset->NewScan());
  JniAssertOkOrThrow(scanner_builder->Pool(pool));
  if (columns != nullptr) {
    std::vector<std::string> column_vector = ToStringVector(env, columns);
    JniAssertOkOrThrow(scanner_builder->Project(column_vector));
  }
  if (substrait_projection != nullptr) {
    std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
                                                            substrait_projection);
    std::vector<arrow::compute::Expression> project_exprs;
    std::vector<std::string> project_names;
    arrow::engine::BoundExpressions bounded_expression =
          JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
    for(arrow::engine::NamedExpression& named_expression :
                                        bounded_expression.named_expressions) {
      project_exprs.push_back(std::move(named_expression.expression));
      project_names.push_back(std::move(named_expression.name));
    }
    JniAssertOkOrThrow(scanner_builder->Project(std::move(project_exprs), std::move(project_names)));
  }
  if (substrait_filter != nullptr) {
    std::shared_ptr<arrow::Buffer> buffer = LoadArrowBufferFromByteBuffer(env,
                                                                substrait_filter);
    std::optional<arrow::compute::Expression> filter_expr = std::nullopt;
    arrow::engine::BoundExpressions bounded_expression =
          JniGetOrThrow(arrow::engine::DeserializeExpressions(*buffer));
    for(arrow::engine::NamedExpression& named_expression :
                                        bounded_expression.named_expressions) {
      filter_expr = named_expression.expression;
      if (named_expression.expression.type()->id() == arrow::Type::BOOL) {
        filter_expr = named_expression.expression;
      } else {
        JniThrow("There is no filter expression in the expression provided");
      }
    }
    if (filter_expr == std::nullopt) {
      JniThrow("The filter expression has not been provided");
    }
    JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr));
  }
  if (file_format_id != -1 && options != nullptr) {
    std::unordered_map<std::string, std::string> option_map = ToStringMap(env, options);
    std::shared_ptr<arrow::dataset::FragmentScanOptions> scan_options =
        JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map));
    JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options));
  }
  JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size));

  auto scanner = JniGetOrThrow(scanner_builder->Finish());
  std::shared_ptr<DisposableScannerAdaptor> scanner_adaptor =
      JniGetOrThrow(DisposableScannerAdaptor::Create(scanner));
  jlong id = CreateNativeRef(scanner_adaptor);
  return id;
  JNI_METHOD_END(-1L)
}