in connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraMetadataFunctions.scala [122:200]
def replaceMetadata(metaDataExpression: CassandraMetadataFunction, plan: LogicalPlan)
: LogicalPlan = {
assert(metaDataExpression.child.isInstanceOf[AttributeReference],
s"""Can only use Cassandra Metadata Functions on Attribute References,
|found a ${metaDataExpression.child.getClass}""".stripMargin)
val cassandraColumnName = metaDataExpression.child.asInstanceOf[AttributeReference].name
val cassandraCql = s"${metaDataExpression.cql}($cassandraColumnName)"
val (cassandraTable) = plan.collectFirst {
case DataSourceV2Relation(table: CassandraTable, _, _, _, _)
if table.tableDef.columnByName.contains(cassandraColumnName) => table }
.getOrElse(throw new IllegalArgumentException(
s"Unable to find Cassandra Source Relation for TTL/Writetime for column $cassandraColumnName"))
val columnDef = cassandraTable.tableDef.columnByName(cassandraColumnName)
if (columnDef.isPrimaryKeyColumn)
throw new AnalysisException(s"Unable to use ${metaDataExpression.cql} function on non-normal column ${columnDef.columnName}")
//Used for CassandraRelation Leaves, giving them a reference to the underlying Metadata
val (cassandraAttributeReference, cassandraField) = if (columnDef.isMultiCell) {
(AttributeReference(cassandraCql, ArrayType(metaDataExpression.dataType), nullable = true)(),
StructField(cassandraCql, ArrayType(metaDataExpression.dataType), true))
} else {
(AttributeReference(cassandraCql, metaDataExpression.dataType, nullable = true)(),
StructField(cassandraCql, metaDataExpression.dataType, true))
}
//Used as a placeholder for everywhere except leaf nodes, to be resolved by the Catalyst Analyzer
val unResolvedAttributeReference = new NullableUnresolvedAttribute(cassandraCql)
//Used for any leaf nodes that do not have the ability to produce a true Metadata Value
val nullAttributeReference = Alias(functions.lit(null).cast(metaDataExpression.dataType).expr, cassandraCql)()
// Remove Metadata Expressions
val metadataFunctionRemovedPlan = plan.transformAllExpressions{
case expression: Expression if expression == metaDataExpression => unResolvedAttributeReference
}
// Add Metadata to CassandraSource
val cassandraSourceModifiedPlan = metadataFunctionRemovedPlan.transform {
case cassandraRelation@DataSourceV2Relation(table: CassandraTable, _, _, _, _)
if table.tableDef.columnByName.contains(cassandraColumnName) =>
val modifiedCassandraTable = table.copy(optionalSchema = Some(table.schema().add(cassandraField)))
cassandraRelation.copy(
modifiedCassandraTable,
cassandraRelation.output :+ cassandraAttributeReference,
)
}
def containsAnyReferenceToTTL(logicalPlan: LogicalPlan): Boolean ={
val references = Seq(cassandraAttributeReference, nullAttributeReference, unResolvedAttributeReference)
val input = logicalPlan.inputSet
references.exists(input.contains)
}
/* Find the leaves of unsatisfied TTL references. Replace them either with a Cassandra TTL attribute
* or a null if no CassandraTTL is possible for that leaf. All other locations are marked as unresolved
* for the next pass of the Analyzer */
val fixedPlan = cassandraSourceModifiedPlan.transformDown{
case plan if (plan.missingInput.contains(unResolvedAttributeReference)) =>
plan.mapChildren(_.transformUp {
case child: Project =>
if (containsAnyReferenceToTTL(child)) {
//This node's input contains a value with the Cassandra TTL name, add an unresolved reference to it
child.copy(child.projectList :+ unResolvedAttributeReference, child.child)
} else {
/* This node's input is missing any child reference to the Cassandra TTL we are adding add a null column reference
with the same name.
This is specifically for graphframes which unions Null References with C* columns
*/
child.copy(child.projectList :+ nullAttributeReference, child.child)
}
})
}
fixedPlan
}