in plugins/migration/copytable/python_copy_data_source.cpp [377:655]
bool PythonCopyDataSource::fetch_row(RowBuffer &rowbuffer) {
PyGILState_STATE state = PyGILState_Ensure();
if (!_cursor || _cursor == Py_None) {
if (PyErr_Occurred())
PyErr_Print();
logError("No cursor object available while attempting to fetch a row. Skipping table %s\n", _table_name.c_str());
PyGILState_Release(state);
return false;
}
PyObject *row = PyObject_CallMethod(_cursor, (char *)"fetchone", NULL);
if (row == NULL || row == Py_None) {
PyGILState_Release(state);
return false;
}
char *buffer;
size_t buffer_len;
PyObject *element;
for (size_t i = 0; i < _column_count; ++i) {
element = PySequence_GetItem(row, i);
if (rowbuffer.check_if_blob() || (*_columns)[i].is_long_data || (*_columns)[i].target_type == MYSQL_TYPE_GEOMETRY) {
if (element == Py_None) {
rowbuffer.finish_field(true);
Py_DECREF(element);
continue;
}
if (PyUnicode_Check(element)) {
PyObject *element_ref = element;
element = PyUnicode_AsUTF8String(element);
Py_DECREF(element_ref);
if (element == NULL || PyErr_Occurred()) {
if (PyErr_Occurred())
PyErr_Print();
logError(
"An error occurred while encoding unicode data as UTF-8 in a long field object at column %s.%s. Skipping "
"table!\n.",
_table_name.c_str(), (*_columns)[i].source_name.c_str());
PyGILState_Release(state);
return false;
}
} else if (!PyObject_CheckBuffer(element)) // Old-style buffers are the interface specified in PEP 249 for BLOB data.
// Attempt to convert.
{
PyObject *element_copy = element;
// element = PyBuffer_FromObject(element, 0, Py_END_OF_BUFFER); // FIXME: WL-12709 fix buffer
Py_DECREF(element_copy);
if (PyErr_Occurred()) {
PyErr_Print();
Py_XDECREF(element);
logError("Unexpected value for BLOB object at column %s.%s. Skipping table!\n.", _table_name.c_str(),
(*_columns)[i].source_name.c_str());
PyGILState_Release(state);
return false;
}
}
Py_buffer view;
//Py_ssize_t blob_read_buffer_len;
int res = PyObject_GetBuffer(element, &view, PyBUF_SIMPLE);
if (res != 0) {
if (PyErr_Occurred()) {
PyErr_Print();
}
logError("Could not get a read buffer for the BLOB column %s.%s. Skipping table!\n", _table_name.c_str(),
(*_columns)[i].source_name.c_str());
Py_DECREF(element);
PyGILState_Release(state);
return false;
}
if (view.len > _max_parameter_size) {
if (_abort_on_oversized_blobs) {
PyGILState_Release(state);
throw std::runtime_error(base::strfmt("oversized blob found in table %s.%s, size: %lu", _schema_name.c_str(),
_table_name.c_str(), (long unsigned int)view.len));
PyBuffer_Release(&view);
} else {
logError("Oversized blob found in table %s.%s, size: %lu", _schema_name.c_str(), _table_name.c_str(),
(long unsigned int)view.len);
rowbuffer.finish_field(true);
Py_DECREF(element);
PyBuffer_Release(&view);
continue;
}
} else { // Proceed to copy from the buffer
Py_ssize_t copied_bytes = 0;
if (!view.len) { // empty buffer
rowbuffer[i].buffer_length = *rowbuffer[i].length = (unsigned long)view.len;
rowbuffer[i].buffer = nullptr;
}
while (copied_bytes < view.len) {
Py_ssize_t this_pass_size = std::min(view.len - copied_bytes, (Py_ssize_t)_max_blob_chunk_size);
// ---- Begin Section: This will fail if multiple passes are done. TODO: Fix this.
if (_use_bulk_inserts) {
if (rowbuffer[i].buffer_length)
free(rowbuffer[i].buffer);
*rowbuffer[i].length = (unsigned long)view.len;
rowbuffer[i].buffer_length = (unsigned long)view.len;
rowbuffer[i].buffer = malloc(view.len);
memcpy(rowbuffer[i].buffer, view.buf, view.len);
} else
rowbuffer.send_blob_data((const char*)view.buf + copied_bytes, this_pass_size);
// ---- End Section
copied_bytes += this_pass_size;
}
rowbuffer.finish_field(false);
PyBuffer_Release(&view);
Py_DECREF(element);
continue;
}
}
bool was_null = element == Py_None;
enum enum_field_types target_type = (*_columns)[i].target_type;
bool is_unsigned = (*_columns)[i].is_unsigned;
switch (target_type) {
case MYSQL_TYPE_TINY:
rowbuffer.prepare_add_tiny(buffer, buffer_len);
if (!was_null) {
if (is_unsigned)
*((unsigned char *)buffer) = (unsigned char)PyLong_AsLong(element);
else
*buffer = (char)PyLong_AsLong(element);
}
rowbuffer.finish_field(was_null);
break;
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_SHORT:
rowbuffer.prepare_add_short(buffer, buffer_len);
if (!was_null) {
if (is_unsigned)
*((unsigned short *)buffer) = (unsigned short)PyLong_AsLong(element);
else
*((short *)buffer) = (short)PyLong_AsLong(element);
}
rowbuffer.finish_field(was_null);
break;
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_LONG:
rowbuffer.prepare_add_long(buffer, buffer_len);
if (!was_null) {
if (is_unsigned)
*((unsigned long *)buffer) = PyLong_AsUnsignedLongMask(element);
else
*((long *)buffer) = PyLong_AsLong(element);
}
rowbuffer.finish_field(was_null);
break;
case MYSQL_TYPE_LONGLONG:
rowbuffer.prepare_add_bigint(buffer, buffer_len);
if (!was_null) {
if (is_unsigned)
*((unsigned long long *)buffer) =
PyLong_Check(element) ? PyLong_AsUnsignedLongLongMask(element) : PyLong_AsUnsignedLongLong(element);
else
*((long long *)buffer) = PyLong_AsLongLong(element);
}
rowbuffer.finish_field(was_null);
break;
case MYSQL_TYPE_FLOAT:
rowbuffer.prepare_add_float(buffer, buffer_len);
if (!was_null)
*((float *)buffer) = (float)PyFloat_AsDouble(element);
rowbuffer.finish_field(was_null);
break;
case MYSQL_TYPE_DOUBLE:
rowbuffer.prepare_add_double(buffer, buffer_len);
if (!was_null)
*((double *)buffer) = PyFloat_AsDouble(element);
rowbuffer.finish_field(was_null);
break;
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_TIMESTAMP:
rowbuffer.prepare_add_time(buffer, buffer_len);
// The select query can yield these fields as Unicode/strings or as datetime.* objects
if (element == Py_None) // element is NULL
((MYSQL_TIME *)buffer)->time_type = MYSQL_TIMESTAMP_NONE;
else {
if (PyObject_HasAttrString(element, "isoformat")) // element is a python datetime.* object
{
PyObject *old_ref = element;
element = PyObject_CallMethod(
element, (char *)"isoformat",
NULL); // Will return an ISO 8601 string representation of the date/time/datetime object
Py_DECREF(old_ref);
}
if (PyUnicode_Check(element) ||
PyUnicode_Check(element)) // element is a string (sqlite sends time data as strings)
{
std::string elem_str;
pystring_to_string(element, elem_str);
BaseConverter::convert_date_time(elem_str.c_str(), (MYSQL_TIME *)buffer, rowbuffer[i].buffer_type);
} else {
PyGILState_Release(state);
throw std::logic_error(
base::strfmt("Wrong python type for date/time/datetime column %s found in table %s.%s: "
"A string or datetime.* object is expected",
(*_columns)[i].source_name.c_str(), _schema_name.c_str(), _table_name.c_str()));
}
}
rowbuffer.finish_field(was_null);
break;
case MYSQL_TYPE_NEWDECIMAL:
case MYSQL_TYPE_STRING:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_BIT:
unsigned long *length;
rowbuffer.prepare_add_string(buffer, buffer_len, length);
if (!was_null) {
// Target type can be MYSQL_TYPE_STRING for decimal columns and yet values can be ints or floats
// If that's the case, get str(element) for insertion:
if (PyFloat_Check(element) || PyLong_Check(element) || PyLong_Check(element)) {
PyObject *elem_ref = element;
element = PyObject_Str(element);
Py_DECREF(elem_ref);
}
if (PyUnicode_Check(element)) {
PyObject *ref = PyUnicode_AsUTF8String(element);
if (ref) {
const char *s;
Py_ssize_t len;
s = PyUnicode_AsUTF8AndSize(ref, &len);
if (buffer_len < (size_t)len) {
logError("Truncating data in column %s from %lul to %lul. Possible loss of data.\n",
(*_columns)[i].source_name.c_str(), (long unsigned int)len, (long unsigned int)buffer_len);
len = buffer_len;
}
memcpy(buffer, s, len);
*length = (unsigned long)len;
Py_DECREF(ref);
} else {
logError("Could not convert unicode string to UTF-8\n");
PyGILState_Release(state);
return false;
}
} else if (PyUnicode_Check(element)) {
const char *s;
Py_ssize_t len;
s = PyUnicode_AsUTF8AndSize(element, &len);
if (buffer_len < (size_t)len) {
logError("Truncating data in column %s from %lul to %lul. Possible loss of data.\n",
(*_columns)[i].source_name.c_str(), (long unsigned int)len, (long unsigned int)buffer_len);
len = buffer_len;
}
memcpy(buffer, s, len);
*length = (unsigned long)len;
} else // Neither a PyUnicode nor a PyString object. This should be an error:
{
logError(
"The python object for column %s is neither a PyUnicode nor a PyString object. Skipping table...\n",
(*_columns)[i].source_name.c_str());
PyGILState_Release(state);
return false;
}
}
rowbuffer.finish_field(was_null);
break;
case MYSQL_TYPE_NULL:
rowbuffer[i].buffer_length = 0;
break;
default:
Py_DECREF(element);
PyGILState_Release(state);
throw std::logic_error(base::strfmt("Unhandled MySQL type %i for column '%s'", (*_columns)[i].target_type,
(*_columns)[i].target_name.c_str()));
}
Py_DECREF(element);
}
PyGILState_Release(state);
return true;
}