in spark/core/src/main/scala/org/elasticsearch/spark/serialization/ScalaValueWriter.scala [52:175]
private def doWriteScala(value: Any, generator: Generator, parentField:String): Result = {
value match {
case null | None | () => generator.writeNull()
case Nil =>
generator.writeBeginArray(); generator.writeEndArray()
case Some(s: AnyRef) => return doWrite(s, generator, parentField)
case m: Map[_, _] => {
generator.writeBeginObject()
for ((k, v) <- m) {
if (shouldKeep(parentField, k.toString)) {
val hasValue = Option(v) match {
case Some(()) => false
case Some(None) => false
case Some(_) => true
case None => true
}
if (hasValue || hasWriteNullValues) {
generator.writeFieldName(k.toString)
val result = doWrite(v, generator, k.toString)
if (!result.isSuccesful) {
return result
}
}
}
}
generator.writeEndObject()
}
case i: Traversable[_] => {
generator.writeBeginArray()
for (v <- i) {
val result = doWrite(v, generator, parentField)
if (!result.isSuccesful) {
return result
}
}
generator.writeEndArray()
}
case b: Array[Byte] => {
generator.writeBinary(b)
}
case i: Array[_] => {
generator.writeBeginArray()
for (v <- i) {
val result = doWrite(v, generator, parentField)
if (!result.isSuccesful) {
return result
}
}
generator.writeEndArray()
}
case p: Product => {
// handle case class
if (RU.isCaseClass(p)) {
val result = doWrite(RU.caseClassValues(p), generator, parentField)
if (!result.isSuccesful) {
return result
}
} // normal product - treat it as a list/array
else {
generator.writeBeginArray()
for (t <- p.productIterator) {
val result = doWrite(t.asInstanceOf[AnyRef], generator, parentField)
if (!result.isSuccesful) {
return result
}
}
generator.writeEndArray()
}
}
case _ => {
// check if it's called by accident on a DataFrame/SchemaRDD (happens)
if (value.getClass().getName().startsWith("org.apache.spark.sql.")) {
throw new EsHadoopIllegalArgumentException("Spark SQL types are not handled through basic RDD saveToEs() calls; typically this is a mistake(as the SQL schema will be ignored). Use 'org.elasticsearch.spark.sql' package instead")
}
val result = super.doWrite(value, generator, parentField)
// Normal JDK types failed, try the JavaBean last. The JavaBean logic accepts just about
// anything, even if it's not a real java bean. Check to see if the value that failed
// is the same value we're about to treat as a bean. If the failed value is not the current
// value, then the last call probably failed on a subfield of the current value that
// couldn't be serialized; There's a chance that we could treat a container object (map,
// list) like a java bean, which is improper. In these cases we should skip the javabean
// handling and just return the result
if (!result.isSuccesful && result.getUnknownValue == value) {
if (!beanTracker.contains(value) && RU.isJavaBean(value)) {
// Recursion warning:
// There's a chance that when we are handed an object, that object has a getter method
// that returns itself, or an object that contains itself, or any level of self nesting.
// This can cause stack overflow errors when serializing. Guard against this:
// First, keep track of objects we've seen, and don't try to serialize them while we're
// already serializing them
beanTracker.add(value)
try {
// Second, Try to sense the immediate case of self reference and break out early to avoid
// stack overflow.
val asMap = RU.javaBeanAsMap(value).filterNot(e => e._2 == value)
val beanResult = doWrite(asMap, generator, parentField)
return beanResult
} finally {
// Third, Allow usage of the same bean only if it doesn't recurse into itself.
// This doubles as clean-up logic to avoid having to clear the set every write call.
beanTracker.remove(value)
}
} else {
return result
}
} else {
return result
}
}
}
Result.SUCCESFUL()
}