in wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java [157:229]
protected Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(final ExecutionStage stage,
final OptimizationContext context) {
final Collection<?> startTasks = stage.getStartTasks();
final Collection<?> termTasks = stage.getTerminalTasks();
// Verify that we can handle this instance.
assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported";
final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0];
assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported.";
final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];
assert startTask.getOperator() instanceof TableSource
: "Invalid JDBC stage: Start task has to be a TableSource";
// Extract the different types of ExecutionOperators from the stage.
final TableSource tableOp = (TableSource) startTask.getOperator();
SqlQueryChannel.Instance tipChannelInstance = this.instantiateOutboundChannel(startTask, context);
final Collection<ExecutionTask> filterTasks = new ArrayList<>(4);
ExecutionTask projectionTask = null;
final Collection<ExecutionTask> joinTasks = new ArrayList<>();
final Set<ExecutionTask> allTasks = stage.getAllTasks();
assert allTasks.size() <= 3;
ExecutionTask nextTask = this.findJdbcExecutionOperatorTaskInStage(startTask, stage);
while (nextTask != null) {
// Evaluate the nextTask.
if (nextTask.getOperator() instanceof JdbcFilterOperator) {
filterTasks.add(nextTask);
} else if (nextTask.getOperator() instanceof JdbcProjectionOperator) {
assert projectionTask == null; // Allow one projection operator per stage for now.
projectionTask = nextTask;
} else if (nextTask.getOperator() instanceof JdbcJoinOperator) {
joinTasks.add(nextTask);
} else {
throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString()));
}
// Move the tipChannelInstance.
tipChannelInstance = this.instantiateOutboundChannel(nextTask, context, tipChannelInstance);
// Go to the next nextTask.
nextTask = this.findJdbcExecutionOperatorTaskInStage(nextTask, stage);
}
// Create the SQL query.
final String tableName = this.getSqlClause(tableOp);
final Collection<String> conditions = filterTasks.stream()
.map(ExecutionTask::getOperator)
.map(this::getSqlClause)
.collect(Collectors.toList());
final String projection = projectionTask == null ? "*" : this.getSqlClause(projectionTask.getOperator());
final Collection<String> joins = joinTasks.stream()
.map(ExecutionTask::getOperator)
.map(this::getSqlClause)
.collect(Collectors.toList());
final StringBuilder sb = new StringBuilder(1000);
sb.append("SELECT ").append(projection).append(" FROM ").append(tableName);
if (!joins.isEmpty()) {
final String separator = " ";
for (final String join : joins) {
sb.append(separator).append(join);
}
}
if (!conditions.isEmpty()) {
sb.append(" WHERE ");
String separator = "";
for (final String condition : conditions) {
sb.append(separator).append(condition);
separator = " AND ";
}
}
sb.append(';');
return new Tuple2<>(sb.toString(), tipChannelInstance);
}