in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java [147:251]
public synchronized void storageInitialize(StorageType storageType) {
if (initialized.contains(storageType)) {
return;
}
final String mkdirLog = "storage initialize, now mkdir [{}] starting ...";
FsOperator fsOperator = FsOperator.of(storageType);
Workspace workspace = Workspace.of(storageType);
// 1. prepare workspace dir
if (storageType.equals(LFS)) {
String localDist = Workspace.APP_LOCAL_DIST();
if (!fsOperator.exists(localDist)) {
log.info(mkdirLog, localDist);
fsOperator.mkdirs(localDist);
}
}
String appUploads = workspace.APP_UPLOADS();
if (!fsOperator.exists(appUploads)) {
log.info(mkdirLog, appUploads);
fsOperator.mkdirs(appUploads);
}
String appWorkspace = workspace.APP_WORKSPACE();
if (!fsOperator.exists(appWorkspace)) {
log.info(mkdirLog, appWorkspace);
fsOperator.mkdirs(appWorkspace);
}
String appBackups = workspace.APP_BACKUPS();
if (!fsOperator.exists(appBackups)) {
log.info(mkdirLog, appBackups);
fsOperator.mkdirs(appBackups);
}
String appSavePoints = workspace.APP_SAVEPOINTS();
if (!fsOperator.exists(appSavePoints)) {
log.info(mkdirLog, appSavePoints);
fsOperator.mkdirs(appSavePoints);
}
String appJars = workspace.APP_JARS();
if (!fsOperator.exists(appJars)) {
log.info(mkdirLog, appJars);
fsOperator.mkdirs(appJars);
}
// 2. upload jar.
// 2.1) upload client jar
File client = WebUtils.getAppClientDir();
Utils.required(
client.exists() && client.listFiles().length > 0,
client.getAbsolutePath().concat(" is not exists or empty directory "));
String appClient = workspace.APP_CLIENT();
fsOperator.mkCleanDirs(appClient);
for (File file : client.listFiles(fileFilter)) {
log.info("load client:{} to {}", file.getName(), appClient);
fsOperator.upload(file.getAbsolutePath(), appClient);
}
// 2.2) upload plugin jar.
String appPlugins = workspace.APP_PLUGINS();
fsOperator.mkCleanDirs(appPlugins);
File plugins = WebUtils.getAppPluginsDir();
for (File file : plugins.listFiles(fileFilter)) {
log.info("load plugin:{} to {}", file.getName(), appPlugins);
fsOperator.upload(file.getAbsolutePath(), appPlugins);
}
// 2.3) upload shims jar
File[] shims =
WebUtils.getAppLibDir()
.listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern()));
Utils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist");
String appShims = workspace.APP_SHIMS();
fsOperator.delete(appShims);
for (File file : shims) {
Matcher matcher = PATTERN_FLINK_SHIMS_JAR.matcher(file.getName());
if (matcher.matches()) {
String version = matcher.group(1);
String shimsPath = appShims.concat("/flink-").concat(version);
fsOperator.mkdirs(shimsPath);
log.info("load shims:{} to {}", file.getName(), shimsPath);
fsOperator.upload(file.getAbsolutePath(), shimsPath);
}
}
// 2.4) create maven local repository dir
String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
if (FsOperator.lfs().exists(localMavenRepo)) {
FsOperator.lfs().mkdirs(localMavenRepo);
}
initialized.add(storageType);
}