in connectors/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala [296:418]
def constructFilter(schema: StructType, nodeName: String, nodeValue: Any,
filterType: FilterTypes.Value): IExpression = {
val fieldNames = schema.fieldNames
val index = fieldNames.indexOf(nodeName)
if (index == -1) {
// placeholder for an invalid filter in the current TsFile
val filter = new SingleSeriesExpression(new Path(nodeName, true), null)
filter
} else {
val dataType = schema.get(index).dataType
filterType match {
case FilterTypes.Eq =>
dataType match {
case BooleanType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.eq(index, nodeValue.asInstanceOf[java.lang.Boolean], TSDataType.BOOLEAN))
filter
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.eq(index, nodeValue.asInstanceOf[java.lang.Integer], TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.eq(index, nodeValue.asInstanceOf[java.lang.Long], TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.eq(index, nodeValue.asInstanceOf[java.lang.Float], TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.eq(index, nodeValue.asInstanceOf[java.lang.Double], TSDataType.DOUBLE))
filter
case StringType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.eq(index, new Binary(nodeValue.toString, TSFileConfig.STRING_CHARSET), TSDataType.TEXT))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.Gt =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.gt(index, nodeValue.asInstanceOf[java.lang.Integer], TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.gt(index, nodeValue.asInstanceOf[java.lang.Long], TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.gt(index, nodeValue.asInstanceOf[java.lang.Float], TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.gt(index, nodeValue.asInstanceOf[java.lang.Double], TSDataType.DOUBLE))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.GtEq =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.gtEq(index, nodeValue.asInstanceOf[java.lang.Integer], TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.gtEq(index, nodeValue.asInstanceOf[java.lang.Long], TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.gtEq(index, nodeValue.asInstanceOf[java.lang.Float], TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.gtEq(index, nodeValue.asInstanceOf[java.lang.Double], TSDataType.DOUBLE))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.Lt =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.lt(index, nodeValue.asInstanceOf[java.lang.Integer], TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.lt(index, nodeValue.asInstanceOf[java.lang.Long], TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.lt(index, nodeValue.asInstanceOf[java.lang.Float], TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.lt(index, nodeValue.asInstanceOf[java.lang.Double], TSDataType.DOUBLE))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.LtEq =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.ltEq(index, nodeValue.asInstanceOf[java.lang.Integer], TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.ltEq(index, nodeValue.asInstanceOf[java.lang.Long], TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.ltEq(index, nodeValue.asInstanceOf[java.lang.Float], TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(nodeName, true),
ValueFilterApi.ltEq(index, nodeValue.asInstanceOf[java.lang.Double], TSDataType.DOUBLE))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
}
}
}