in measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala [171:210]
override def validate(): Unit = {
val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
assert(expr.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
assert(
!StringUtil.isNullOrEmpty(refSource),
s"'$ReferenceSourceStr' must not be null, empty or of invalid type.")
assert(
sparkSession.catalog.tableExists(refSource),
s"Reference source with name '$refSource' does not exist.")
val datasourceName = measureParam.getDataSource
val dataSourceCols = sparkSession.read.table(datasourceName).columns.toSet
val refDataSourceCols = sparkSession.read.table(refSource).columns.toSet
val accuracyExpr = expr.map(toAccuracyExpr).distinct
val (forDataSource, forRefDataSource) =
accuracyExpr
.map(
e =>
(
(e.sourceCol, dataSourceCols.contains(e.sourceCol)),
(e.refCol, refDataSourceCols.contains(e.refCol))))
.unzip
val invalidColsDataSource = forDataSource.filterNot(_._2)
val invalidColsRefSource = forRefDataSource.filterNot(_._2)
assert(
invalidColsDataSource.isEmpty,
s"Column(s) [${invalidColsDataSource.map(_._1).mkString(", ")}] " +
s"do not exist in data set with name '$datasourceName'")
assert(
invalidColsRefSource.isEmpty,
s"Column(s) [${invalidColsRefSource.map(_._1).mkString(", ")}] " +
s"do not exist in reference data set with name '$refSource'")
}