in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/utils/KuduTableUtils.java [232:272]
private static Optional<KuduFilterInfo> convertIsInExpression(List<Expression> children) {
// IN operation will be: or(equals(field, value1), equals(field, value2), ...) in blink
// For FilterType IS_IN, all internal CallExpression's function need to be equals and
// fields need to be same
List<Object> values = new ArrayList<>(children.size());
String columnName = "";
for (int i = 0; i < children.size(); i++) {
if (children.get(i) instanceof CallExpression) {
CallExpression callExpression = (CallExpression) children.get(i);
FunctionDefinition functionDefinition = callExpression.getFunctionDefinition();
List<Expression> subChildren = callExpression.getChildren();
FieldReferenceExpression fieldReferenceExpression;
ValueLiteralExpression valueLiteralExpression;
if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)
&& subChildren.size() == 2
&& isFieldReferenceExpression(subChildren.get(0))
&& isValueLiteralExpression(subChildren.get(1))) {
fieldReferenceExpression = (FieldReferenceExpression) subChildren.get(0);
valueLiteralExpression = (ValueLiteralExpression) subChildren.get(1);
String fieldName = fieldReferenceExpression.getName();
if (i != 0 && !columnName.equals(fieldName)) {
return Optional.empty();
} else {
columnName = fieldName;
}
Object value =
extractValueLiteral(fieldReferenceExpression, valueLiteralExpression);
if (value == null) {
return Optional.empty();
}
values.add(i, value);
} else {
return Optional.empty();
}
} else {
return Optional.empty();
}
}
KuduFilterInfo.Builder builder = KuduFilterInfo.Builder.create(columnName);
return Optional.of(builder.isIn(values).build());
}