public synchronized void storageInitialize()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java [147:251]


  public synchronized void storageInitialize(StorageType storageType) {

    if (initialized.contains(storageType)) {
      return;
    }

    final String mkdirLog = "storage initialize, now mkdir [{}] starting ...";

    FsOperator fsOperator = FsOperator.of(storageType);
    Workspace workspace = Workspace.of(storageType);

    // 1. prepare workspace dir
    if (storageType.equals(LFS)) {
      String localDist = Workspace.APP_LOCAL_DIST();
      if (!fsOperator.exists(localDist)) {
        log.info(mkdirLog, localDist);
        fsOperator.mkdirs(localDist);
      }
    }

    String appUploads = workspace.APP_UPLOADS();
    if (!fsOperator.exists(appUploads)) {
      log.info(mkdirLog, appUploads);
      fsOperator.mkdirs(appUploads);
    }

    String appWorkspace = workspace.APP_WORKSPACE();
    if (!fsOperator.exists(appWorkspace)) {
      log.info(mkdirLog, appWorkspace);
      fsOperator.mkdirs(appWorkspace);
    }

    String appBackups = workspace.APP_BACKUPS();
    if (!fsOperator.exists(appBackups)) {
      log.info(mkdirLog, appBackups);
      fsOperator.mkdirs(appBackups);
    }

    String appSavePoints = workspace.APP_SAVEPOINTS();
    if (!fsOperator.exists(appSavePoints)) {
      log.info(mkdirLog, appSavePoints);
      fsOperator.mkdirs(appSavePoints);
    }

    String appJars = workspace.APP_JARS();
    if (!fsOperator.exists(appJars)) {
      log.info(mkdirLog, appJars);
      fsOperator.mkdirs(appJars);
    }

    // 2. upload jar.
    // 2.1) upload client jar
    File client = WebUtils.getAppClientDir();
    Utils.required(
        client.exists() && client.listFiles().length > 0,
        client.getAbsolutePath().concat(" is not exists or empty directory "));

    String appClient = workspace.APP_CLIENT();
    fsOperator.mkCleanDirs(appClient);

    for (File file : client.listFiles(fileFilter)) {
      log.info("load client:{} to {}", file.getName(), appClient);
      fsOperator.upload(file.getAbsolutePath(), appClient);
    }

    // 2.2) upload plugin jar.
    String appPlugins = workspace.APP_PLUGINS();
    fsOperator.mkCleanDirs(appPlugins);

    File plugins = WebUtils.getAppPluginsDir();
    for (File file : plugins.listFiles(fileFilter)) {
      log.info("load plugin:{} to {}", file.getName(), appPlugins);
      fsOperator.upload(file.getAbsolutePath(), appPlugins);
    }

    // 2.3) upload shims jar
    File[] shims =
        WebUtils.getAppLibDir()
            .listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern()));

    Utils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist");

    String appShims = workspace.APP_SHIMS();
    fsOperator.delete(appShims);

    for (File file : shims) {
      Matcher matcher = PATTERN_FLINK_SHIMS_JAR.matcher(file.getName());
      if (matcher.matches()) {
        String version = matcher.group(1);
        String shimsPath = appShims.concat("/flink-").concat(version);
        fsOperator.mkdirs(shimsPath);
        log.info("load shims:{} to {}", file.getName(), shimsPath);
        fsOperator.upload(file.getAbsolutePath(), shimsPath);
      }
    }

    // 2.4) create maven local repository dir

    String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
    if (FsOperator.lfs().exists(localMavenRepo)) {
      FsOperator.lfs().mkdirs(localMavenRepo);
    }

    initialized.add(storageType);
  }