in statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterEntryPoint.java [69:101]
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(
LOG, StatefulFunctionsClusterEntryPoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
final CommandLineParser<StatefulFunctionsClusterConfiguration> commandLineParser =
new CommandLineParser<>(new StatefulFunctionsClusterConfigurationParserFactory());
StatefulFunctionsClusterConfiguration clusterConfiguration = null;
try {
clusterConfiguration = commandLineParser.parse(args);
} catch (Exception e) {
LOG.error("Could not parse command line arguments {}.", args, e);
commandLineParser.printHelp(StatefulFunctionsClusterEntryPoint.class.getSimpleName());
System.exit(1);
}
Configuration configuration = loadConfiguration(clusterConfiguration);
addStatefulFunctionsConfiguration(configuration);
setDefaultExecutionModeIfNotConfigured(configuration);
StatefulFunctionsClusterEntryPoint entrypoint =
new StatefulFunctionsClusterEntryPoint(
configuration,
resolveJobIdForCluster(
Optional.ofNullable(clusterConfiguration.getJobId()), configuration),
clusterConfiguration.getSavepointRestoreSettings(),
clusterConfiguration.getParallelism(),
clusterConfiguration.getArgs());
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}