modules/platforms/python/cpp_module/py_cursor.cpp (428 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <ignite/odbc/sql_statement.h>
#include <ignite/odbc/query/data_query.h>
#include "module.h"
#include "utils.h"
#include "py_cursor.h"
#include "type_conversion.h"
#include <Python.h>
/**
* Write row of the param set using a provided writer.
*
* @param writer Writer.
* @param params_row Parameter Row.
* @param row_size_expected Expected size of the row.
*/
void write_row(ignite::protocol::writer &writer, PyObject *params_row, std::int32_t row_size_expected) {
if (!params_row || params_row == Py_None) {
throw ignite::ignite_error("Parameter row can not be None");
}
if (!PySequence_Check(params_row)) {
throw ignite::ignite_error(std::string("Parameter row does not provide the sequence protocol: ") +
py_object_get_typename(params_row));
}
Py_ssize_t seq_size{PySequence_Size(params_row)};
if (seq_size < 0) {
throw ignite::ignite_error("Internal error while getting size of the parameter list sequence");
}
auto row_size = std::int32_t(seq_size);
if (row_size != row_size_expected) {
throw ignite::ignite_error("Row size is unexpected: " + std::to_string(row_size) +
", expected row size: " + std::to_string(row_size_expected));
}
ignite::binary_tuple_builder row_builder{row_size * 3};
row_builder.start();
for (std::int32_t idx = 0; idx < row_size; ++idx) {
submit_pyobject(row_builder, PySequence_GetItem(params_row, idx), true);
}
row_builder.layout();
for (std::int32_t idx = 0; idx < row_size; ++idx) {
submit_pyobject(row_builder, PySequence_GetItem(params_row, idx), false);
}
auto row_data = row_builder.build();
writer.write_binary(row_data);
}
/**
* Python parameter set.
*/
class py_parameter_set : public ignite::parameter_set {
public:
/**
* Constructor.
*
* @param size Size of the row.
* @param params Python parameters sequence.
*/
py_parameter_set(Py_ssize_t size, PyObject *params) : m_size(size), m_params(params) {}
/**
* Write only the first row of the param set using a provided writer.
*
* @param writer Writer.
*/
void write(ignite::protocol::writer &writer) const override {
if (!m_size) {
writer.write_nil();
return;
}
writer.write(std::int32_t(m_size));
write_row(writer, m_params, std::int32_t(m_size));
}
/**
* Write rows of the param set in an interval [begin, end) using a provided writer.
*
* @param writer Writer.
* @param begin Beginning of the interval.
* @param end End of the interval.
* @param last Last page flag.
*/
void write(ignite::protocol::writer &writer, SQLULEN begin, SQLULEN end, bool last) const override {
throw ignite::ignite_error("Execution with the batch of parameters is not implemented");
}
/**
* Get parameter set size.
*
* @return Number of rows in a set.
*/
[[nodiscard]] std::int32_t get_param_set_size() const override {
return 1;
}
/**
* Set the number of parameters processed in batch.
*
* @param processed Processed.
*/
void set_params_processed(SQLULEN processed) override { m_processed = processed; }
/**
* Get pointer to array in which to return the status of each set of parameters.
*
* @return Value.
*/
[[nodiscard]] SQLUSMALLINT *get_params_status_ptr() const override {
return nullptr;
}
private:
/** Size. */
Py_ssize_t m_size{0};
/** Python sequence of parameters. */
PyObject *m_params{nullptr};
/** Processed params. */
SQLULEN m_processed{0};
};
/**
* Python parameter list set.
*/
class py_parameter_list_set : public ignite::parameter_set {
public:
/**
* Constructor.
*
* @param size Size number of rows to insert.
* @param row_size Number of params in a single row.
* @param params Python parameter sequence list.
*/
py_parameter_list_set(Py_ssize_t size, Py_ssize_t row_size, PyObject *params)
: m_size(size)
, m_row_size(row_size)
, m_params(params) {}
/**
* Write only the first row of the param set using a provided writer.
*
* @param writer Writer.
*/
void write(ignite::protocol::writer &writer) const override {
PyObject *row = PySequence_GetItem(m_params, 0);
if (!m_row_size) {
writer.write_nil();
return;
}
writer.write(std::int32_t(m_row_size));
write_row(writer, row, std::int32_t(m_row_size));
}
/**
* Write rows of the param set in interval [begin, end) using provided writer.
*
* @param writer Writer.
* @param begin Beginning of the interval.
* @param end End of the interval.
* @param last Last page flag.
*/
void write(ignite::protocol::writer &writer, SQLULEN begin, SQLULEN end, bool last) const override {
Py_ssize_t interval_end = std::min(m_size, Py_ssize_t(end));
std::int32_t rows_num = std::int32_t(interval_end) - std::int32_t(begin);
writer.write(std::int32_t(m_row_size));
writer.write(rows_num);
writer.write_bool(last);
for (auto i = Py_ssize_t(begin); i < interval_end; ++i) {
PyObject *row = PySequence_GetItem(m_params, i);
write_row(writer, row, std::int32_t(m_row_size));
}
}
/**
* Get parameter set size.
*
* @return Number of rows in a set.
*/
[[nodiscard]] std::int32_t get_param_set_size() const override {
return std::int32_t(m_size);
}
/**
* Set the number of parameters processed in batch.
*
* @param processed Processed.
*/
void set_params_processed(SQLULEN processed) override { m_processed = processed; }
/**
* Get pointer to array in which to return the status of each set of parameters.
*
* @return Value.
*/
[[nodiscard]] SQLUSMALLINT *get_params_status_ptr() const override {
return nullptr;
}
private:
/** Rows number. */
Py_ssize_t m_size{0};
/** Row size. */
Py_ssize_t m_row_size{0};
/** Python sequence of parameters. */
PyObject *m_params{nullptr};
/** Processed params. */
SQLULEN m_processed{0};
};
/**
* Check if the cursor is open. Set error if not.
* @param self Cursor.
* @return @c true if open and @c false, otherwise.
*/
static bool py_cursor_expect_open(const py_cursor* self) {
if (!self->m_statement) {
PyErr_SetString(py_get_module_interface_error_class(), "Cursor is in invalid state (Already closed?)");
return false;
}
return true;
}
int py_cursor_init(py_cursor *self, PyObject *args, PyObject *kwds)
{
UNUSED_VALUE args;
UNUSED_VALUE kwds;
self->m_statement = nullptr;
return 0;
}
void py_cursor_dealloc(py_cursor *self)
{
delete self->m_statement;
self->m_statement = nullptr;
Py_TYPE(self)->tp_free(self);
}
static PyObject* py_cursor_close(py_cursor* self, PyObject*)
{
if (self->m_statement) {
self->m_statement->close();
if (!check_errors(*self->m_statement))
return nullptr;
delete self->m_statement;
self->m_statement = nullptr;
}
Py_RETURN_NONE;
}
static PyObject* py_cursor_execute(py_cursor* self, PyObject* args, PyObject* kwargs)
{
if (!py_cursor_expect_open(self))
return nullptr;
static char *kwlist[] = {
"query",
"params",
nullptr
};
const char* query = nullptr;
PyObject *params = nullptr;
int parsed = PyArg_ParseTupleAndKeywords(args, kwargs, "s|O", kwlist, &query, ¶ms);
if (!parsed)
return nullptr;
Py_ssize_t size{0};
if (params && params != Py_None) {
if (PySequence_Check(params)) {
size = PySequence_Size(params);
if (size < 0) {
PyErr_SetString(py_get_module_interface_error_class(),
"Internal error while getting size of the parameters sequence");
return nullptr;
}
} else {
auto msg_str = std::string("The object does not provide the sequence protocol: ")
+ py_object_get_typename(params);
PyErr_SetString(py_get_module_interface_error_class(), msg_str.c_str());
return nullptr;
}
}
py_parameter_set py_params(size, params);
self->m_statement->execute_sql_query(query, py_params);
if (!check_errors(*self->m_statement))
return nullptr;
Py_RETURN_NONE;
}
static PyObject* py_cursor_executemany(py_cursor* self, PyObject* args, PyObject* kwargs)
{
if (!py_cursor_expect_open(self))
return nullptr;
static char *kwlist[] = {
"query",
"params_list",
nullptr
};
const char* query = nullptr;
PyObject *params_list = nullptr;
int parsed = PyArg_ParseTupleAndKeywords(args, kwargs, "s|O", kwlist, &query, ¶ms_list);
if (!parsed)
return nullptr;
Py_ssize_t size{0};
Py_ssize_t row_size{0};
if (params_list && params_list != Py_None) {
if (PySequence_Check(params_list)) {
size = PySequence_Size(params_list);
if (size < 0) {
PyErr_SetString(py_get_module_interface_error_class(),
"Internal error while getting size of the parameter list sequence");
return nullptr;
}
if (size > 0) {
PyObject *row0 = PySequence_GetItem(params_list, 0);
if (row0 == nullptr) {
PyErr_SetString(py_get_module_interface_error_class(),
"Can not get a first element of the parameter sequence");
}
if (!PySequence_Check(row0)) {
auto msg_str = std::string(
"A first element of the parameter sequence does not provide the sequence protocol: ")
+ py_object_get_typename(params_list);
PyErr_SetString(py_get_module_interface_error_class(), msg_str.c_str());
}
row_size = PySequence_Size(row0);
if (row_size < 0) {
PyErr_SetString(py_get_module_interface_error_class(),
"Internal error while getting size of the first parameter row");
return nullptr;
}
}
} else {
auto msg_str = std::string("The object does not provide the sequence protocol: ")
+ py_object_get_typename(params_list);
PyErr_SetString(py_get_module_interface_error_class(), msg_str.c_str());
return nullptr;
}
}
py_parameter_list_set py_params_list(size, row_size, params_list);
self->m_statement->execute_sql_query(query, py_params_list);
if (!check_errors(*self->m_statement))
return nullptr;
Py_RETURN_NONE;
}
static PyObject* py_cursor_rowcount(py_cursor* self, PyObject*)
{
if (!py_cursor_expect_open(self))
return nullptr;
auto query = self->m_statement->get_query();
if (!query)
return PyLong_FromLong(-1);
return PyLong_FromLong(long(query->affected_rows()));
}
static PyObject* py_cursor_fetchone(py_cursor* self, PyObject*)
{
if (!py_cursor_expect_open(self))
return nullptr;
auto query = self->m_statement->get_query();
if (!query) {
PyErr_SetString(py_get_module_interface_error_class(), "Query was not executed");
return nullptr;
}
if (query->get_type() != ignite::query_type::DATA) {
auto err_msg = "Unexpected query type: " + std::to_string(int(query->get_type()));
PyErr_SetString(py_get_module_interface_error_class(), err_msg.c_str());
return nullptr;
}
if (!query->is_data_available()) {
Py_RETURN_NONE;
}
auto& query0 = static_cast<ignite::data_query&>(*query);
auto res = query0.fetch_next_row();
if (res == ignite::sql_result::AI_NO_DATA) {
Py_RETURN_NONE;
}
if (!check_errors(res, *self->m_statement)) {
return nullptr;
}
auto row = query0.get_current_row();
auto res_list = PyTuple_New(row.size());
if (!res_list) {
PyErr_SetString(py_get_module_operational_error_class(), "Can not allocate a new list for the result set");
return nullptr;
}
for (std::size_t i = 0; i < row.size(); ++i) {
auto py_column = primitive_to_pyobject(row[i]);
if (!py_column) {
Py_DECREF(res_list);
return nullptr;
}
PyTuple_SetItem(res_list, i, py_column);
}
return res_list;
}
static PyObject* py_cursor_column_count(py_cursor* self, PyObject*)
{
if (!py_cursor_expect_open(self))
return nullptr;
auto query = self->m_statement->get_query();
if (!query)
return PyLong_FromLong(0);
return PyLong_FromLong(long(query->get_meta()->size()));
}
const ignite::column_meta *get_meta_column(py_cursor* self, long idx, PyObject *&err_ret) {
err_ret = nullptr;
auto query = self->m_statement->get_query();
if (!query) {
Py_INCREF(Py_None);
err_ret = Py_None;
return nullptr;
}
auto meta = query->get_meta();
if (!meta) {
Py_INCREF(Py_None);
err_ret = Py_None;
return nullptr;
}
if (idx < 0 || idx >= long(meta->size())) {
PyErr_SetString(py_get_module_interface_error_class(), "Column metadata index is out of bound");
return nullptr;
}
return &meta->at(idx);
}
static PyObject* py_cursor_column_name(py_cursor* self, PyObject* args)
{
if (!py_cursor_expect_open(self))
return nullptr;
long idx{0};
int parsed = PyArg_ParseTuple(args, "l", &idx);
if (!parsed)
return nullptr;
PyObject* err{nullptr};
auto column = get_meta_column(self, idx, err);
if (!column)
return err;
return PyUnicode_FromStringAndSize(column->get_column_name().data(), column->get_column_name().size());
}
static PyObject* py_cursor_column_type_code(py_cursor* self, PyObject* args)
{
if (!py_cursor_expect_open(self))
return nullptr;
long idx{0};
int parsed = PyArg_ParseTuple(args, "l", &idx);
if (!parsed)
return nullptr;
PyObject* err{nullptr};
auto column = get_meta_column(self, idx, err);
if (!column)
return err;
return PyLong_FromLong(long(column->get_data_type()));
}
static PyObject* py_cursor_column_display_size(py_cursor* self, PyObject*)
{
if (!py_cursor_expect_open(self))
return nullptr;
Py_RETURN_NONE;
}
static PyObject* py_cursor_column_internal_size(py_cursor* self, PyObject*)
{
if (!py_cursor_expect_open(self))
return nullptr;
Py_RETURN_NONE;
}
static PyObject* py_cursor_column_precision(py_cursor* self, PyObject* args)
{
if (!py_cursor_expect_open(self))
return nullptr;
long idx{0};
int parsed = PyArg_ParseTuple(args, "l", &idx);
if (!parsed)
return nullptr;
PyObject* err{nullptr};
auto column = get_meta_column(self, idx, err);
if (!column)
return err;
return PyLong_FromLong(long(column->get_precision()));
}
static PyObject* py_cursor_column_scale(py_cursor* self, PyObject* args)
{
if (!py_cursor_expect_open(self))
return nullptr;
long idx{0};
int parsed = PyArg_ParseTuple(args, "l", &idx);
if (!parsed)
return nullptr;
PyObject* err{nullptr};
auto column = get_meta_column(self, idx, err);
if (!column)
return err;
return PyLong_FromLong(long(column->get_scale()));
}
static PyObject* py_cursor_null_ok(py_cursor* self, PyObject* args)
{
if (!py_cursor_expect_open(self))
return nullptr;
long idx{0};
int parsed = PyArg_ParseTuple(args, "l", &idx);
if (!parsed)
return nullptr;
PyObject* err{nullptr};
auto column = get_meta_column(self, idx, err);
if (!column)
return err;
return PyBool_FromLong(long(column->get_nullability() == ignite::nullability::NULLABLE));
}
static PyTypeObject py_cursor_type = {
PyVarObject_HEAD_INIT(nullptr, 0)
EXT_MODULE_NAME "." PY_CURSOR_CLASS_NAME
};
static struct PyMethodDef py_cursor_methods[] = {
// Core methods
{"close", (PyCFunction)py_cursor_close, METH_NOARGS, nullptr},
{"execute", (PyCFunction)py_cursor_execute, METH_VARARGS | METH_KEYWORDS, nullptr},
{"executemany", (PyCFunction)py_cursor_executemany, METH_VARARGS | METH_KEYWORDS, nullptr},
{"rowcount", (PyCFunction)py_cursor_rowcount, METH_NOARGS, nullptr},
{"fetchone", (PyCFunction)py_cursor_fetchone, METH_NOARGS, nullptr},
// Column metadata retrieval methods
{"column_count", (PyCFunction)py_cursor_column_count, METH_NOARGS, nullptr},
{"column_name", (PyCFunction)py_cursor_column_name, METH_VARARGS, nullptr},
{"column_type_code", (PyCFunction)py_cursor_column_type_code, METH_VARARGS, nullptr},
{"column_display_size", (PyCFunction)py_cursor_column_display_size, METH_VARARGS, nullptr},
{"column_internal_size", (PyCFunction)py_cursor_column_internal_size, METH_VARARGS, nullptr},
{"column_precision", (PyCFunction)py_cursor_column_precision, METH_VARARGS, nullptr},
{"column_scale", (PyCFunction)py_cursor_column_scale, METH_VARARGS, nullptr},
{"column_null_ok", (PyCFunction)py_cursor_null_ok, METH_VARARGS, nullptr},
{nullptr, nullptr, 0, nullptr}
};
int prepare_py_cursor_type() {
py_cursor_type.tp_new = PyType_GenericNew;
py_cursor_type.tp_basicsize=sizeof(py_cursor);
py_cursor_type.tp_dealloc=(destructor)py_cursor_dealloc;
py_cursor_type.tp_flags=Py_TPFLAGS_DEFAULT;
py_cursor_type.tp_methods=py_cursor_methods;
py_cursor_type.tp_init=(initproc)py_cursor_init;
return PyType_Ready(&py_cursor_type);
}
int register_py_cursor_type(PyObject* mod) {
auto res = PyModule_AddObject(mod, PY_CURSOR_CLASS_NAME, (PyObject *)&py_cursor_type);
if (res < 0) {
Py_DECREF((PyObject *)&py_cursor_type);
}
return res;
}
py_cursor *make_py_cursor(std::unique_ptr<ignite::sql_statement> stmt) {
py_cursor* py_cursor_obj = PyObject_New(py_cursor, &py_cursor_type);
if (!py_cursor_obj)
return nullptr;
py_cursor_obj->m_statement = stmt.release();
return py_cursor_obj;
}