in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java [1249:1401]
public Map<String, Object> importSqlWorkflowDefinition(User loginUser, long projectCode, MultipartFile file) {
Map<String, Object> result;
Project project = projectMapper.queryByCode(projectCode);
result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_IMPORT);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
String workflowDefinitionName =
file.getOriginalFilename() == null ? file.getName() : file.getOriginalFilename();
int index = workflowDefinitionName.lastIndexOf(".");
if (index > 0) {
workflowDefinitionName = workflowDefinitionName.substring(0, index);
}
workflowDefinitionName = getNewName(workflowDefinitionName, IMPORT_SUFFIX);
WorkflowDefinition workflowDefinition;
List<TaskDefinitionLog> taskDefinitionList = new ArrayList<>();
List<WorkflowTaskRelationLog> workflowTaskRelationLogList = new ArrayList<>();
// for Zip Bomb Attack
final int THRESHOLD_ENTRIES = 10000;
final int THRESHOLD_SIZE = 1000000000; // 1 GB
final double THRESHOLD_RATIO = 10;
int totalEntryArchive = 0;
int totalSizeEntry = 0;
// In most cases, there will be only one data source
Map<String, DataSource> dataSourceCache = new HashMap<>(1);
Map<String, Long> taskNameToCode = new HashMap<>(16);
Map<String, List<String>> taskNameToUpstream = new HashMap<>(16);
try (
ZipInputStream zIn = new ZipInputStream(file.getInputStream());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(zIn))) {
// build workflow definition
workflowDefinition = new WorkflowDefinition(projectCode,
workflowDefinitionName,
CodeGenerateUtils.genCode(),
"",
"[]", null,
0, loginUser.getId());
ZipEntry entry;
while ((entry = zIn.getNextEntry()) != null) {
totalEntryArchive++;
int totalSizeArchive = 0;
if (!entry.isDirectory()) {
StringBuilder sql = new StringBuilder();
String taskName = null;
String datasourceName = null;
List<String> upstreams = Collections.emptyList();
String line;
while ((line = bufferedReader.readLine()) != null) {
int nBytes = line.getBytes(StandardCharsets.UTF_8).length;
totalSizeEntry += nBytes;
totalSizeArchive += nBytes;
long compressionRatio = totalSizeEntry / entry.getCompressedSize();
if (compressionRatio > THRESHOLD_RATIO) {
throw new IllegalStateException(
"Ratio between compressed and uncompressed data is highly suspicious, looks like a Zip Bomb Attack.");
}
int commentIndex = line.indexOf("-- ");
if (commentIndex >= 0) {
int colonIndex = line.indexOf(":", commentIndex);
if (colonIndex > 0) {
String key = line.substring(commentIndex + 3, colonIndex).trim().toLowerCase();
String value = line.substring(colonIndex + 1).trim();
switch (key) {
case "name":
taskName = value;
line = line.substring(0, commentIndex);
break;
case "upstream":
upstreams = Arrays.stream(value.split(",")).map(String::trim)
.filter(s -> !"".equals(s)).collect(Collectors.toList());
line = line.substring(0, commentIndex);
break;
case "datasource":
datasourceName = value;
line = line.substring(0, commentIndex);
break;
default:
break;
}
}
}
if (!"".equals(line)) {
sql.append(line).append("\n");
}
}
// import/sql1.sql -> sql1
if (taskName == null) {
taskName = entry.getName();
index = taskName.indexOf("/");
if (index > 0) {
taskName = taskName.substring(index + 1);
}
index = taskName.lastIndexOf(".");
if (index > 0) {
taskName = taskName.substring(0, index);
}
}
DataSource dataSource = dataSourceCache.get(datasourceName);
if (dataSource == null) {
dataSource = queryDatasourceByNameAndUser(datasourceName, loginUser);
}
if (dataSource == null) {
log.error("Datasource does not found, may be its name is illegal.");
putMsg(result, Status.DATASOURCE_NAME_ILLEGAL);
return result;
}
dataSourceCache.put(datasourceName, dataSource);
TaskDefinitionLog taskDefinition =
buildNormalSqlTaskDefinition(taskName, dataSource, sql.substring(0, sql.length() - 1));
taskDefinitionList.add(taskDefinition);
taskNameToCode.put(taskDefinition.getName(), taskDefinition.getCode());
taskNameToUpstream.put(taskDefinition.getName(), upstreams);
}
if (totalSizeArchive > THRESHOLD_SIZE) {
throw new IllegalStateException(
"the uncompressed data size is too much for the application resource capacity");
}
if (totalEntryArchive > THRESHOLD_ENTRIES) {
throw new IllegalStateException(
"too much entries in this archive, can lead to inodes exhaustion of the system");
}
}
} catch (Exception e) {
log.error("Import workflow definition error.", e);
putMsg(result, Status.IMPORT_WORKFLOW_DEFINE_ERROR);
return result;
}
// build task relation
for (Map.Entry<String, Long> entry : taskNameToCode.entrySet()) {
List<String> upstreams = taskNameToUpstream.get(entry.getKey());
if (CollectionUtils.isEmpty(upstreams)
|| (upstreams.size() == 1 && upstreams.contains("root") && !taskNameToCode.containsKey("root"))) {
WorkflowTaskRelationLog workflowTaskRelationLog = buildNormalTaskRelation(0, entry.getValue());
workflowTaskRelationLogList.add(workflowTaskRelationLog);
continue;
}
for (String upstream : upstreams) {
WorkflowTaskRelationLog workflowTaskRelationLog =
buildNormalTaskRelation(taskNameToCode.get(upstream), entry.getValue());
workflowTaskRelationLogList.add(workflowTaskRelationLog);
}
}
return createDagDefine(loginUser, workflowTaskRelationLogList, workflowDefinition, taskDefinitionList);
}