bool PythonCopyDataSource::fetch_row()

in plugins/migration/copytable/python_copy_data_source.cpp [377:655]


bool PythonCopyDataSource::fetch_row(RowBuffer &rowbuffer) {
  PyGILState_STATE state = PyGILState_Ensure();
  if (!_cursor || _cursor == Py_None) {
    if (PyErr_Occurred())
      PyErr_Print();
    logError("No cursor object available while attempting to fetch a row. Skipping table %s\n", _table_name.c_str());
    PyGILState_Release(state);
    return false;
  }

  PyObject *row = PyObject_CallMethod(_cursor, (char *)"fetchone", NULL);
  if (row == NULL || row == Py_None) {
    PyGILState_Release(state);
    return false;
  }

  char *buffer;
  size_t buffer_len;
  PyObject *element;

  for (size_t i = 0; i < _column_count; ++i) {
    element = PySequence_GetItem(row, i);
    if (rowbuffer.check_if_blob() || (*_columns)[i].is_long_data || (*_columns)[i].target_type == MYSQL_TYPE_GEOMETRY) {
      if (element == Py_None) {
        rowbuffer.finish_field(true);
        Py_DECREF(element);
        continue;
      }
      if (PyUnicode_Check(element)) {
        PyObject *element_ref = element;
        element = PyUnicode_AsUTF8String(element);
        Py_DECREF(element_ref);
        if (element == NULL || PyErr_Occurred()) {
          if (PyErr_Occurred())
            PyErr_Print();
          logError(
            "An error occurred while encoding unicode data as UTF-8 in a long field object at column %s.%s. Skipping "
            "table!\n.",
            _table_name.c_str(), (*_columns)[i].source_name.c_str());
          PyGILState_Release(state);
          return false;
        }
      } else if (!PyObject_CheckBuffer(element)) // Old-style buffers are the interface specified in PEP 249 for BLOB data.
                                                 // Attempt to convert.
      {
        PyObject *element_copy = element;
        // element = PyBuffer_FromObject(element, 0, Py_END_OF_BUFFER); // FIXME: WL-12709 fix buffer
        Py_DECREF(element_copy);
        if (PyErr_Occurred()) {
          PyErr_Print();
          Py_XDECREF(element);
          logError("Unexpected value for BLOB object at column %s.%s. Skipping table!\n.", _table_name.c_str(),
                   (*_columns)[i].source_name.c_str());
          PyGILState_Release(state);
          return false;
        }
      }
      Py_buffer view;
      //Py_ssize_t blob_read_buffer_len;
      int res = PyObject_GetBuffer(element, &view, PyBUF_SIMPLE);
      if (res != 0) {
        if (PyErr_Occurred()) {
          PyErr_Print();
        }
        logError("Could not get a read buffer for the BLOB column %s.%s. Skipping table!\n", _table_name.c_str(),
                 (*_columns)[i].source_name.c_str());
        Py_DECREF(element);
        PyGILState_Release(state);
        return false;
      }
      if (view.len > _max_parameter_size) {
        if (_abort_on_oversized_blobs) {

          PyGILState_Release(state);
          throw std::runtime_error(base::strfmt("oversized blob found in table %s.%s, size: %lu", _schema_name.c_str(),
                                                _table_name.c_str(), (long unsigned int)view.len));
          PyBuffer_Release(&view);
        } else {
          logError("Oversized blob found in table %s.%s, size: %lu", _schema_name.c_str(), _table_name.c_str(),
                   (long unsigned int)view.len);
          rowbuffer.finish_field(true);
          Py_DECREF(element);
          PyBuffer_Release(&view);
          continue;
        }
      } else { // Proceed to copy from the buffer
        Py_ssize_t copied_bytes = 0;
        if (!view.len) { // empty buffer
          rowbuffer[i].buffer_length = *rowbuffer[i].length = (unsigned long)view.len;
          rowbuffer[i].buffer = nullptr;
        }
        while (copied_bytes < view.len) {
          Py_ssize_t this_pass_size = std::min(view.len - copied_bytes, (Py_ssize_t)_max_blob_chunk_size);
          // ---- Begin Section: This will fail if multiple passes are done. TODO: Fix this.
          if (_use_bulk_inserts) {
            if (rowbuffer[i].buffer_length)
              free(rowbuffer[i].buffer);

            *rowbuffer[i].length = (unsigned long)view.len;
            rowbuffer[i].buffer_length = (unsigned long)view.len;
            rowbuffer[i].buffer = malloc(view.len);

            memcpy(rowbuffer[i].buffer, view.buf, view.len);
          } else
            rowbuffer.send_blob_data((const char*)view.buf + copied_bytes, this_pass_size);
          // ---- End Section
          copied_bytes += this_pass_size;         
        }
        rowbuffer.finish_field(false);
        PyBuffer_Release(&view);
        Py_DECREF(element);
        continue;
      }
    }
    bool was_null = element == Py_None;
    enum enum_field_types target_type = (*_columns)[i].target_type;
    bool is_unsigned = (*_columns)[i].is_unsigned;
    switch (target_type) {
      case MYSQL_TYPE_TINY:
        rowbuffer.prepare_add_tiny(buffer, buffer_len);
        if (!was_null) {
          if (is_unsigned)
            *((unsigned char *)buffer) = (unsigned char)PyLong_AsLong(element);
          else
            *buffer = (char)PyLong_AsLong(element);
        }
        rowbuffer.finish_field(was_null);
        break;
      case MYSQL_TYPE_YEAR:
      case MYSQL_TYPE_SHORT:
        rowbuffer.prepare_add_short(buffer, buffer_len);
        if (!was_null) {
          if (is_unsigned)
            *((unsigned short *)buffer) = (unsigned short)PyLong_AsLong(element);
          else
            *((short *)buffer) = (short)PyLong_AsLong(element);
        }
        rowbuffer.finish_field(was_null);
        break;
      case MYSQL_TYPE_INT24:
      case MYSQL_TYPE_LONG:
        rowbuffer.prepare_add_long(buffer, buffer_len);
        if (!was_null) {
          if (is_unsigned)
            *((unsigned long *)buffer) = PyLong_AsUnsignedLongMask(element);
          else
            *((long *)buffer) = PyLong_AsLong(element);
        }
        rowbuffer.finish_field(was_null);
        break;
      case MYSQL_TYPE_LONGLONG:
        rowbuffer.prepare_add_bigint(buffer, buffer_len);
        if (!was_null) {
          if (is_unsigned)
            *((unsigned long long *)buffer) =
              PyLong_Check(element) ? PyLong_AsUnsignedLongLongMask(element) : PyLong_AsUnsignedLongLong(element);
          else
            *((long long *)buffer) = PyLong_AsLongLong(element);
        }
        rowbuffer.finish_field(was_null);
        break;
      case MYSQL_TYPE_FLOAT:
        rowbuffer.prepare_add_float(buffer, buffer_len);
        if (!was_null)
          *((float *)buffer) = (float)PyFloat_AsDouble(element);
        rowbuffer.finish_field(was_null);
        break;
      case MYSQL_TYPE_DOUBLE:
        rowbuffer.prepare_add_double(buffer, buffer_len);
        if (!was_null)
          *((double *)buffer) = PyFloat_AsDouble(element);
        rowbuffer.finish_field(was_null);
        break;
      case MYSQL_TYPE_TIME:
      case MYSQL_TYPE_DATE:
      case MYSQL_TYPE_NEWDATE:
      case MYSQL_TYPE_DATETIME:
      case MYSQL_TYPE_TIMESTAMP:
        rowbuffer.prepare_add_time(buffer, buffer_len);
        // The select query can yield these fields as Unicode/strings or as datetime.* objects
        if (element == Py_None) // element is NULL
          ((MYSQL_TIME *)buffer)->time_type = MYSQL_TIMESTAMP_NONE;
        else {
          if (PyObject_HasAttrString(element, "isoformat")) // element is a python datetime.* object
          {
            PyObject *old_ref = element;
            element = PyObject_CallMethod(
              element, (char *)"isoformat",
              NULL); // Will return an ISO 8601 string representation of the date/time/datetime object
            Py_DECREF(old_ref);
          }
          if (PyUnicode_Check(element) ||
              PyUnicode_Check(element)) // element is a string (sqlite sends time data as strings)
          {
            std::string elem_str;
            pystring_to_string(element, elem_str);
            BaseConverter::convert_date_time(elem_str.c_str(), (MYSQL_TIME *)buffer, rowbuffer[i].buffer_type);
          } else {
            PyGILState_Release(state);
            throw std::logic_error(
              base::strfmt("Wrong python type for date/time/datetime column %s found in table %s.%s: "
                           "A string or datetime.* object is expected",
                           (*_columns)[i].source_name.c_str(), _schema_name.c_str(), _table_name.c_str()));
          }
        }

        rowbuffer.finish_field(was_null);
        break;
      case MYSQL_TYPE_NEWDECIMAL:
      case MYSQL_TYPE_STRING:
      case MYSQL_TYPE_VAR_STRING:
      case MYSQL_TYPE_BIT:
        unsigned long *length;
        rowbuffer.prepare_add_string(buffer, buffer_len, length);

        if (!was_null) {
          // Target type can be MYSQL_TYPE_STRING for decimal columns and yet values can be ints or floats
          // If that's the case, get str(element) for insertion:
          if (PyFloat_Check(element) || PyLong_Check(element) || PyLong_Check(element)) {
            PyObject *elem_ref = element;
            element = PyObject_Str(element);
            Py_DECREF(elem_ref);
          }

          if (PyUnicode_Check(element)) {
            PyObject *ref = PyUnicode_AsUTF8String(element);
            if (ref) {
              const char *s;
              Py_ssize_t len;
              s = PyUnicode_AsUTF8AndSize(ref, &len);
              if (buffer_len < (size_t)len) {
                logError("Truncating data in column %s from %lul to %lul. Possible loss of data.\n",
                         (*_columns)[i].source_name.c_str(), (long unsigned int)len, (long unsigned int)buffer_len);
                len = buffer_len;
              }
              memcpy(buffer, s, len);
              *length = (unsigned long)len;
              Py_DECREF(ref);
            } else {
              logError("Could not convert unicode string to UTF-8\n");
              PyGILState_Release(state);
              return false;
            }
          } else if (PyUnicode_Check(element)) {
            const char *s;
            Py_ssize_t len;
            s = PyUnicode_AsUTF8AndSize(element, &len);
            if (buffer_len < (size_t)len) {
              logError("Truncating data in column %s from %lul to %lul. Possible loss of data.\n",
                       (*_columns)[i].source_name.c_str(), (long unsigned int)len, (long unsigned int)buffer_len);
              len = buffer_len;
            }
            memcpy(buffer, s, len);
            *length = (unsigned long)len;
          } else // Neither a PyUnicode nor a PyString object. This should be an error:
          {
            logError(
              "The python object for column %s is neither a PyUnicode nor a PyString object. Skipping table...\n",
              (*_columns)[i].source_name.c_str());
            PyGILState_Release(state);
            return false;
          }
        }
        rowbuffer.finish_field(was_null);
        break;
      case MYSQL_TYPE_NULL:
        rowbuffer[i].buffer_length = 0;
        break;
      default:
        Py_DECREF(element);
        PyGILState_Release(state);
        throw std::logic_error(base::strfmt("Unhandled MySQL type %i for column '%s'", (*_columns)[i].target_type,
                                            (*_columns)[i].target_name.c_str()));
    }
    Py_DECREF(element);
  }
  PyGILState_Release(state);
  return true;
}