in connectors/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala [399:521]
def constructExpression(schema: StructType, nodeName: String, nodeValue: String,
filterType: FilterTypes.Value, device_name: String): IExpression = {
val fieldNames = schema.fieldNames
val index = fieldNames.indexOf(nodeName)
if (index == -1) {
// placeholder for an invalid filter in the current TsFile
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true), null)
filter
} else {
val dataType = schema.get(index).dataType
filterType match {
case FilterTypes.Eq =>
dataType match {
case BooleanType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.eq(index, new java.lang.Boolean(nodeValue), TSDataType.BOOLEAN))
filter
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.eq(index, new java.lang.Integer(nodeValue), TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.eq(index, new java.lang.Long(nodeValue), TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.eq(index, new java.lang.Float(nodeValue), TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.eq(index, new java.lang.Double(nodeValue), TSDataType.DOUBLE))
filter
case StringType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.eq(index, nodeValue, TSDataType.TEXT))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.Gt =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.gt(index, new java.lang.Integer(nodeValue), TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.gt(index, new java.lang.Long(nodeValue), TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.gt(index, new java.lang.Float(nodeValue), TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.gt(index, new java.lang.Double(nodeValue), TSDataType.DOUBLE))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.GtEq =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.gtEq(index, new java.lang.Integer(nodeValue), TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.gtEq(index, new java.lang.Long(nodeValue), TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.gtEq(index, new java.lang.Float(nodeValue), TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.gtEq(index, new java.lang.Double(nodeValue), TSDataType.DOUBLE))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.Lt =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.lt(index, new java.lang.Integer(nodeValue), TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.lt(index, new java.lang.Long(nodeValue), TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.lt(index, new java.lang.Float(nodeValue), TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.lt(index, new java.lang.Double(nodeValue), TSDataType.DOUBLE))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
case FilterTypes.LtEq =>
dataType match {
case IntegerType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.ltEq(index, new java.lang.Integer(nodeValue), TSDataType.INT32))
filter
case LongType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.ltEq(index, new java.lang.Long(nodeValue), TSDataType.INT64))
filter
case FloatType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.ltEq(index, new java.lang.Float(nodeValue), TSDataType.FLOAT))
filter
case DoubleType =>
val filter = new SingleSeriesExpression(new Path(device_name, nodeName, true),
ValueFilterApi.ltEq(index, new java.lang.Double(nodeValue), TSDataType.DOUBLE))
filter
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
}
}
}