func main()

in sdks/java/container/boot.go [60:278]


func main() {
	flag.Parse()
	if *id == "" {
		log.Fatal("No id provided.")
	}
	if *provisionEndpoint == "" {
		log.Fatal("No provision endpoint provided.")
	}

	ctx := grpcx.WriteWorkerID(context.Background(), *id)

	info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
	if err != nil {
		log.Fatalf("Failed to obtain provisioning information: %v", err)
	}
	log.Printf("Provision info:\n%v", info)

	// TODO(BEAM-8201): Simplify once flags are no longer used.
	if info.GetLoggingEndpoint().GetUrl() != "" {
		*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
	}
	if info.GetArtifactEndpoint().GetUrl() != "" {
		*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
	}
	if info.GetControlEndpoint().GetUrl() != "" {
		*controlEndpoint = info.GetControlEndpoint().GetUrl()
	}

	if *loggingEndpoint == "" {
		log.Fatal("No logging endpoint provided.")
	}
	if *artifactEndpoint == "" {
		log.Fatal("No artifact endpoint provided.")
	}
	if *controlEndpoint == "" {
		log.Fatal("No control endpoint provided.")
	}
	logger := &tools.Logger{Endpoint: *loggingEndpoint}

	logger.Printf(ctx, "Initializing java harness: %v", strings.Join(os.Args, " "))

	// (1) Obtain the pipeline options
	options, err := tools.ProtoToJSON(info.GetPipelineOptions())
	if err != nil {
		logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
	}

	// (2) Retrieve the staged user jars. We ignore any disk limit,
	// because the staged jars are mandatory.

	// Using the SDK Harness ID in the artifact destination path to make sure that dependencies used by multiple
	// SDK Harnesses in the same VM do not conflict. This is needed since some runners (for example, Dataflow)
	// may share the artifact staging directory across multiple SDK Harnesses
	// TODO(https://github.com/apache/beam/issues/20009): consider removing the SDK Harness ID from the staging path after Dataflow can properly
	// seperate out dependencies per environment.
	dir := filepath.Join(*semiPersistDir, *id, "staged")

	artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
	if err != nil {
		logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err)
	}

	// (3) Invoke the Java harness, preserving artifact ordering in classpath.

	os.Setenv("HARNESS_ID", *id)
	if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
		logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)
	}
	os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
	os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
	os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))

	if info.GetStatusEndpoint() != nil {
		os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", info.GetStatusEndpoint().String())
	}

	const jarsDir = "/opt/apache/beam/jars"
	const javaHarnessJar = "beam-sdks-java-harness.jar"
	defaultLoggingJars := []string{
		"slf4j-api.jar",
		"slf4j-jdk14.jar",
		"jcl-over-slf4j.jar",
		"log4j-over-slf4j.jar",
		"log4j-to-slf4j.jar",
	}
	cp := []string{}
	if strings.Contains(options, "use_custom_logging_libraries") {
		// In this case, the logging libraries will be provided from the staged
		// artifacts.
		logger.Warnf(ctx, "Skipping default slf4j dependencies in classpath")
	} else {
		logger.Printf(ctx, "Using default slf4j dependencies in classpath")
		for _, jar := range defaultLoggingJars {
			cp = append(cp, filepath.Join(jarsDir, jar))
		}
	}
	var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar")

	if hasWorkerExperiment {
		// Skip adding system "beam-sdks-java-harness.jar". User-provided jar will
		// be added to classpath as a normal user jar further below.
		logger.Printf(ctx, "Opted to use staged java harness. Make sure beam-sdks-java-harness is included or shaded in the staged jars.")
	} else {
		cp = append(cp, filepath.Join(jarsDir, javaHarnessJar))
	}

	for _, a := range artifacts {
		name, _ := artifact.MustExtractFilePayload(a)
		if hasWorkerExperiment {
			if name == "dataflow-worker.jar" {
				continue
			}
		}
		cp = append(cp, filepath.Join(dir, filepath.FromSlash(name)))
	}

	var lim uint64
	if strings.Contains(options, "set_recommended_max_xmx") {
		lim = 32 << 30
	} else {
		size, err := syscallx.PhysicalMemorySize()
		if err != nil {
			size = 0
		}
		lim = HeapSizeLimit(size)
	}

	args := []string{
		"-Xmx" + strconv.FormatUint(lim, 10),
		// ParallelGC the most adequate for high throughput and lower CPU utilization
		// It is the default GC in Java 8, but not on newer versions
		"-XX:+UseParallelGC",
		"-XX:+AlwaysActAsServerClassMachine",
		"-XX:-OmitStackTraceInFastThrow",
	}

	enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
	enableGoogleCloudHeapSampling := strings.Contains(options, enableGoogleCloudHeapSamplingOption)
	if enableGoogleCloudProfiler {
		if metadata := info.GetMetadata(); metadata != nil {
			if jobName, nameExists := metadata["job_name"]; nameExists {
				if jobId, idExists := metadata["job_id"]; idExists {
					if enableGoogleCloudHeapSampling {
						args = append(args, fmt.Sprintf(googleCloudProfilerAgentHeapArgs, jobName, jobId))
					} else {
						args = append(args, fmt.Sprintf(googleCloudProfilerAgentBaseArgs, jobName, jobId))
					}
					logger.Printf(ctx, "Turning on Cloud Profiling. Profile heap: %t", enableGoogleCloudHeapSampling)
				} else {
					logger.Printf(ctx, "Required job_id missing from metadata, profiling will not be enabled without it.")
				}
			} else {
				logger.Printf(ctx, "Required job_name missing from metadata, profiling will not be enabled without it.")
			}
		} else {
			logger.Printf(ctx, "enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled.")
		}
	}

	disableJammAgent := strings.Contains(options, disableJammAgentOption)
	if disableJammAgent {
		logger.Printf(ctx, "Disabling Jamm agent. Measuring object size will be inaccurate.")
	} else {
		args = append(args, jammAgentArgs)
	}
	// Apply meta options
	const metaDir = "/opt/apache/beam/options"

	// Note: Error is unchecked, so parsing errors won't abort container.
	// TODO: verify if it's intentional or not.
	metaOptions, _ := LoadMetaOptions(ctx, logger, metaDir)

	javaOptions := BuildOptions(ctx, logger, metaOptions)
	// (1) Add custom jvm arguments: "-server -Xmx1324 -XXfoo .."
	args = append(args, javaOptions.JavaArguments...)

	// (2) Add classpath: "-cp foo.jar:bar.jar:.."
	if len(javaOptions.Classpath) > 0 {
		cp = append(cp, javaOptions.Classpath...)
	}
	pathingjar, err := makePathingJar(cp)
	if err != nil {
		logger.Fatalf(ctx, "makePathingJar failed: %v", err)
	}
	args = append(args, "-cp")
	args = append(args, pathingjar)

	// (3) Add (sorted) properties: "-Dbar=baz -Dfoo=bar .."
	var properties []string
	for key, value := range javaOptions.Properties {
		properties = append(properties, fmt.Sprintf("-D%s=%s", key, value))
	}
	sort.Strings(properties)
	args = append(args, properties...)

	if pipelineOptions, ok := info.GetPipelineOptions().GetFields()["options"]; ok {
		// Open modules specified in pipeline options
		if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddOpenModules"]; ok {
			for _, module := range modules.GetListValue().GetValues() {
				args = append(args, "--add-opens="+module.GetStringValue())
			}
		}
		// Add modules specified in pipeline options
		if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddRootModules"]; ok {
			for _, module := range modules.GetListValue().GetValues() {
				args = append(args, "--add-modules="+module.GetStringValue())
			}
		}
	}
	// Automatically open modules for Java 11+
	openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar"
	if _, err := os.Stat(openModuleAgentJar); err == nil {
		args = append(args, "-javaagent:"+openModuleAgentJar)
	}
	args = append(args, "org.apache.beam.fn.harness.FnHarness")
	logger.Printf(ctx, "Executing: java %v", strings.Join(args, " "))

	logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...))
}