def replaceMetadata()

in connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraMetadataFunctions.scala [122:200]


  def replaceMetadata(metaDataExpression: CassandraMetadataFunction, plan: LogicalPlan)
  : LogicalPlan = {
    assert(metaDataExpression.child.isInstanceOf[AttributeReference],
      s"""Can only use Cassandra Metadata Functions on Attribute References,
         |found a ${metaDataExpression.child.getClass}""".stripMargin)

    val cassandraColumnName = metaDataExpression.child.asInstanceOf[AttributeReference].name
    val cassandraCql = s"${metaDataExpression.cql}($cassandraColumnName)"

    val (cassandraTable) = plan.collectFirst {
      case DataSourceV2Relation(table: CassandraTable, _, _, _, _)
        if table.tableDef.columnByName.contains(cassandraColumnName) => table }
      .getOrElse(throw new IllegalArgumentException(
        s"Unable to find Cassandra Source Relation for TTL/Writetime for column $cassandraColumnName"))

    val columnDef = cassandraTable.tableDef.columnByName(cassandraColumnName)

    if (columnDef.isPrimaryKeyColumn)
      throw new AnalysisException(s"Unable to use ${metaDataExpression.cql} function on non-normal column ${columnDef.columnName}")

    //Used for CassandraRelation Leaves, giving them a reference to the underlying Metadata
    val (cassandraAttributeReference, cassandraField) = if (columnDef.isMultiCell) {
      (AttributeReference(cassandraCql, ArrayType(metaDataExpression.dataType), nullable = true)(),
        StructField(cassandraCql, ArrayType(metaDataExpression.dataType), true))
      } else {
      (AttributeReference(cassandraCql, metaDataExpression.dataType, nullable = true)(),
        StructField(cassandraCql, metaDataExpression.dataType, true))
      }

    //Used as a placeholder for everywhere except leaf nodes, to be resolved by the Catalyst Analyzer
    val unResolvedAttributeReference =  new NullableUnresolvedAttribute(cassandraCql)

    //Used for any leaf nodes that do not have the ability to produce a true Metadata Value
    val nullAttributeReference = Alias(functions.lit(null).cast(metaDataExpression.dataType).expr, cassandraCql)()

    // Remove Metadata Expressions
    val metadataFunctionRemovedPlan = plan.transformAllExpressions{
      case expression: Expression if expression == metaDataExpression => unResolvedAttributeReference
    }

    // Add Metadata to CassandraSource
    val cassandraSourceModifiedPlan = metadataFunctionRemovedPlan.transform {
      case cassandraRelation@DataSourceV2Relation(table: CassandraTable, _, _, _, _)
        if table.tableDef.columnByName.contains(cassandraColumnName) =>
        val modifiedCassandraTable = table.copy(optionalSchema = Some(table.schema().add(cassandraField)))
        cassandraRelation.copy(
          modifiedCassandraTable,
          cassandraRelation.output :+ cassandraAttributeReference,
        )
    }

    def containsAnyReferenceToTTL(logicalPlan: LogicalPlan): Boolean ={
      val references = Seq(cassandraAttributeReference, nullAttributeReference, unResolvedAttributeReference)
      val input = logicalPlan.inputSet
      references.exists(input.contains)
    }

    /* Find the leaves of unsatisfied TTL references. Replace them either with a Cassandra TTL attribute
    * or a null if no CassandraTTL is possible for that leaf. All other locations are marked as unresolved
    * for the next pass of the Analyzer */
    val fixedPlan = cassandraSourceModifiedPlan.transformDown{
      case plan if (plan.missingInput.contains(unResolvedAttributeReference)) =>
        plan.mapChildren(_.transformUp {
          case child: Project =>
            if (containsAnyReferenceToTTL(child)) {
              //This node's input contains a value with the Cassandra TTL name, add an unresolved reference to it
              child.copy(child.projectList :+ unResolvedAttributeReference, child.child)
            } else {
              /* This node's input is missing any child reference to the Cassandra TTL we are adding add a null column reference
                 with the same name.
                 This is specifically for graphframes which unions Null References with C* columns
               */
              child.copy(child.projectList :+ nullAttributeReference, child.child)
            }
        })
    }

    fixedPlan
  }