override def write()

in phoenix5-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala [32:89]


  override def write(statement: PreparedStatement): Unit = {
    // Make sure we at least line up in size
    if(upsertValues.length != columnMetaDataList.length) {
      throw new UnsupportedOperationException(
        s"Upsert values ($upsertValues) do not match the specified columns (columnMetaDataList)"
      )
    }

    // Correlate each value (v) to a column type (c) and an index (i)
    upsertValues.zip(columnMetaDataList).zipWithIndex.foreach {
      case ((v, c), i) => {
        if (v != null) {

          // Both Java and Joda dates used to work in 4.2.3, but now they must be java.sql.Date
          // Can override any other types here as needed
          val (finalObj, finalType) = v match {
            case dt: DateTime => (new java.sql.Date(dt.getMillis), PDate.INSTANCE)
            case d: java.util.Date => (new java.sql.Date(d.getTime), PDate.INSTANCE)
            case _ => (v, c.getPDataType)
          }


          // Helper method to create an SQL array for a specific PDatatype, and set it on the statement
          def setArrayInStatement(obj: Array[AnyRef]): Unit = {
            // Create a java.sql.Array, need to lookup the base sql type name
            val sqlArray = statement.getConnection.createArrayOf(
              PDataType.arrayBaseType(finalType).getSqlTypeName,
              obj
            )
            statement.setArray(i + 1, sqlArray)
          }

          // Determine whether to save as an array or object
          (finalObj, finalType) match {
            case (obj: Array[AnyRef], _) => setArrayInStatement(obj)
            case (obj: mutable.ArrayBuffer[AnyVal], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]).toArray)
            case (obj: mutable.ArrayBuffer[AnyRef], _) => setArrayInStatement(obj.toArray)
            case (obj: mutable.WrappedArray[AnyVal], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]).toArray)
            case (obj: mutable.WrappedArray[AnyRef], _) => setArrayInStatement(obj.toArray)
            case (obj: Array[Int], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
            case (obj: Array[Long], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
            case (obj: Array[Char], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
            case (obj: Array[Short], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
            case (obj: Array[Float], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
            case (obj: Array[Double], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
            // PVarbinary and PBinary come in as Array[Byte] but they're SQL objects
            case (obj: Array[Byte], _ : PVarbinary) => statement.setObject(i + 1, obj)
            case (obj: Array[Byte], _ : PBinary) => statement.setObject(i + 1, obj)
            // Otherwise set as array type
            case (obj: Array[Byte], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
            case _ => statement.setObject(i + 1, finalObj)
          }
        } else {
          statement.setNull(i + 1, c.getSqlType)
        }
      }
    }
  }