in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java [413:496]
private BuildPipeline createPipelineInstance(@Nonnull Application app) {
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
ExecutionMode executionMode = app.getExecutionModeEnum();
String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
switch (executionMode) {
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE)
&& app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
}
FlinkYarnApplicationBuildRequest yarnAppRequest =
new FlinkYarnApplicationBuildRequest(
app.getJobName(),
mainClass,
localWorkspace,
yarnProvidedPath,
app.getDevelopmentMode(),
getMergedDependencyInfo(app));
log.info("Submit params to building pipeline : {}", yarnAppRequest);
return FlinkYarnApplicationBuildPipeline.of(yarnAppRequest);
case YARN_PER_JOB:
case YARN_SESSION:
case REMOTE:
FlinkRemotePerJobBuildRequest buildRequest =
new FlinkRemotePerJobBuildRequest(
app.getJobName(),
app.getLocalAppHome(),
mainClass,
flinkUserJar,
app.isCustomCodeJob(),
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
getMergedDependencyInfo(app));
log.info("Submit params to building pipeline : {}", buildRequest);
return FlinkRemoteBuildPipeline.of(buildRequest);
case KUBERNETES_NATIVE_SESSION:
FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
new FlinkK8sSessionBuildRequest(
app.getJobName(),
app.getLocalAppHome(),
mainClass,
flinkUserJar,
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
getMergedDependencyInfo(app),
app.getClusterId(),
app.getK8sNamespace());
log.info("Submit params to building pipeline : {}", k8sSessionBuildRequest);
return FlinkK8sSessionBuildPipeline.of(k8sSessionBuildRequest);
case KUBERNETES_NATIVE_APPLICATION:
FlinkK8sApplicationBuildRequest k8sApplicationBuildRequest =
new FlinkK8sApplicationBuildRequest(
app.getJobName(),
app.getLocalAppHome(),
mainClass,
flinkUserJar,
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
getMergedDependencyInfo(app),
app.getClusterId(),
app.getK8sNamespace(),
app.getFlinkImage(),
app.getK8sPodTemplates(),
app.getK8sHadoopIntegration() != null ? app.getK8sHadoopIntegration() : false,
DockerConf.of(
settingService.getDockerRegisterAddress(),
settingService.getDockerRegisterNamespace(),
settingService.getDockerRegisterUser(),
settingService.getDockerRegisterPassword()),
app.getIngressTemplate());
log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for ExecutionMode: " + app.getExecutionModeEnum());
}
}