func main()

in sdks/java/container/boot.go [61:311]


func main() {
	flag.Parse()
	if *id == "" {
		log.Fatal("No id provided.")
	}
	if *provisionEndpoint == "" {
		log.Fatal("No provision endpoint provided.")
	}

	ctx := grpcx.WriteWorkerID(context.Background(), *id)

	info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
	if err != nil {
		log.Fatalf("Failed to obtain provisioning information: %v", err)
	}
	log.Printf("Provision info:\n%v", info)

	// TODO(BEAM-8201): Simplify once flags are no longer used.
	if info.GetLoggingEndpoint().GetUrl() != "" {
		*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
	}
	if info.GetArtifactEndpoint().GetUrl() != "" {
		*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
	}
	if info.GetControlEndpoint().GetUrl() != "" {
		*controlEndpoint = info.GetControlEndpoint().GetUrl()
	}

	if *loggingEndpoint == "" {
		log.Fatal("No logging endpoint provided.")
	}
	if *artifactEndpoint == "" {
		log.Fatal("No artifact endpoint provided.")
	}
	if *controlEndpoint == "" {
		log.Fatal("No control endpoint provided.")
	}
	logger := &tools.Logger{Endpoint: *loggingEndpoint}

	logger.Printf(ctx, "Initializing java harness: %v", strings.Join(os.Args, " "))

	// (1) Obtain the pipeline options
	options, err := tools.ProtoToJSON(info.GetPipelineOptions())
	if err != nil {
		logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
	}

	// (2) Retrieve the staged user jars. We ignore any disk limit,
	// because the staged jars are mandatory.

	// Using the SDK Harness ID in the artifact destination path to make sure that dependencies used by multiple
	// SDK Harnesses in the same VM do not conflict. This is needed since some runners (for example, Dataflow)
	// may share the artifact staging directory across multiple SDK Harnesses
	// TODO(https://github.com/apache/beam/issues/20009): consider removing the SDK Harness ID from the staging path after Dataflow can properly
	// seperate out dependencies per environment.
	dir := filepath.Join(*semiPersistDir, *id, "staged")

	artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
	if err != nil {
		logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err)
	}

	// (3) Invoke the Java harness, preserving artifact ordering in classpath.

	os.Setenv("HARNESS_ID", *id)
	if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
		logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)
	}
	os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
	os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
	os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))

	if info.GetStatusEndpoint() != nil {
		os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", info.GetStatusEndpoint().String())
	}

	const jarsDir = "/opt/apache/beam/jars"
	const javaHarnessJar = "beam-sdks-java-harness.jar"
	defaultLoggingJars := []string{
		"slf4j-api.jar",
		"slf4j-jdk14.jar",
		"jcl-over-slf4j.jar",
		"log4j-over-slf4j.jar",
		"log4j-to-slf4j.jar",
	}
	cp := []string{}
	if strings.Contains(options, "use_custom_logging_libraries") {
		// In this case, the logging libraries will be provided from the staged
		// artifacts.
		logger.Warnf(ctx, "Skipping default slf4j dependencies in classpath")
	} else {
		logger.Printf(ctx, "Using default slf4j dependencies in classpath")
		for _, jar := range defaultLoggingJars {
			cp = append(cp, filepath.Join(jarsDir, jar))
		}
	}
	var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar")

	if hasWorkerExperiment {
		// Skip adding system "beam-sdks-java-harness.jar". User-provided jar will
		// be added to classpath as a normal user jar further below.
		logger.Printf(ctx, "Opted to use staged java harness. Make sure beam-sdks-java-harness is included or shaded in the staged jars.")
	} else {
		cp = append(cp, filepath.Join(jarsDir, javaHarnessJar))
	}

	for _, a := range artifacts {
		name, _ := artifact.MustExtractFilePayload(a)
		if hasWorkerExperiment {
			if name == "dataflow-worker.jar" {
				continue
			}
		}
		cp = append(cp, filepath.Join(dir, filepath.FromSlash(name)))
	}

	var lim uint64
	if strings.Contains(options, "set_recommended_max_xmx") {
		lim = 32 << 30
	} else {
		size, err := syscallx.PhysicalMemorySize()
		if err != nil {
			size = 0
		}
		lim = HeapSizeLimit(size)
	}

	args := []string{
		"-Xmx" + strconv.FormatUint(lim, 10),
		// ParallelGC the most adequate for high throughput and lower CPU utilization
		// It is the default GC in Java 8, but not on newer versions
		"-XX:+UseParallelGC",
		"-XX:+AlwaysActAsServerClassMachine",
		"-XX:-OmitStackTraceInFastThrow",
	}

	enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
	enableGoogleCloudHeapSampling := strings.Contains(options, enableGoogleCloudHeapSamplingOption)
	if enableGoogleCloudProfiler {
		metadata := info.GetMetadata()
	    profilerServiceName := ExtractProfilerServiceName(options, metadata)

	    if profilerServiceName != "" {
		   if jobId, idExists := metadata["job_id"]; idExists {
			   if enableGoogleCloudHeapSampling {
				   args = append(args, fmt.Sprintf(googleCloudProfilerAgentHeapArgs, profilerServiceName, jobId))
			   } else {
				   args = append(args, fmt.Sprintf(googleCloudProfilerAgentBaseArgs, profilerServiceName, jobId))
			   }
			   logger.Printf(ctx, "Turning on Cloud Profiling. Profile heap: %t, service: %s", enableGoogleCloudHeapSampling, profilerServiceName)
		   } else {
			   logger.Printf(ctx, "job_id is missing from metadata. Cannot enable profiling.")
		   }
	    }
    }

	disableJammAgent := strings.Contains(options, disableJammAgentOption)
	if disableJammAgent {
		logger.Printf(ctx, "Disabling Jamm agent. Measuring object size will be inaccurate.")
	} else {
		args = append(args, jammAgentArgs)
	}

	// If heap dumping is enabled, configure the JVM to dump it on oom events.
	if pipelineOptions, ok := info.GetPipelineOptions().GetFields()["options"]; ok {
		if heapDumpOption, ok := pipelineOptions.GetStructValue().GetFields()["enableHeapDumps"]; ok {
			if heapDumpOption.GetBoolValue() {
				args = append(args, "-XX:+HeapDumpOnOutOfMemoryError",
					"-Dbeam.fn.heap_dump_dir="+filepath.Join(dir, "heapdumps"),
					"-XX:HeapDumpPath="+filepath.Join(dir, "heapdumps", "heap_dump.hprof"))
			}
		}
	}

	// Apply meta options
	const metaDir = "/opt/apache/beam/options"

	metaOptions, err := LoadMetaOptions(ctx, logger, metaDir)
	if err != nil {
		logger.Errorf(ctx, "LoadMetaOptions failed: %v", err)
	}

	javaOptions := BuildOptions(ctx, logger, metaOptions)
	// (1) Add custom jvm arguments: "-server -Xmx1324 -XXfoo .."
	args = append(args, javaOptions.JavaArguments...)

	// (2) Add classpath: "-cp foo.jar:bar.jar:.."
	if len(javaOptions.Classpath) > 0 {
		cp = append(cp, javaOptions.Classpath...)
	}
	pathingjar, err := makePathingJar(cp)
	if err != nil {
		logger.Fatalf(ctx, "makePathingJar failed: %v", err)
	}
	args = append(args, "-cp")
	args = append(args, pathingjar)

	// (3) Add (sorted) properties: "-Dbar=baz -Dfoo=bar .."
	var properties []string
	for key, value := range javaOptions.Properties {
		properties = append(properties, fmt.Sprintf("-D%s=%s", key, value))
	}
	sort.Strings(properties)
	args = append(args, properties...)

	if pipelineOptions, ok := info.GetPipelineOptions().GetFields()["options"]; ok {
		// Open modules specified in pipeline options
		if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddOpenModules"]; ok {
			for _, module := range modules.GetListValue().GetValues() {
				args = append(args, "--add-opens="+module.GetStringValue())
			}
		}
		// Add modules specified in pipeline options
		if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddRootModules"]; ok {
			for _, module := range modules.GetListValue().GetValues() {
				args = append(args, "--add-modules="+module.GetStringValue())
			}
		}
		// Add trusted Avro serializable classes
		var serializableClassesList []string
		if serializableClasses, ok := pipelineOptions.GetStructValue().GetFields()["avroSerializableClasses"]; ok {
			for _, cls := range serializableClasses.GetListValue().GetValues() {
				// User can specify an empty list, which is serialized as a single, blank value
				if cls.GetStringValue() != "" {
					serializableClassesList = append(serializableClassesList, cls.GetStringValue())
				}
			}
		} else {
			serializableClassesList = []string{
				"java.math.BigDecimal",
				"java.math.BigInteger",
				"java.net.URI",
				"java.net.URL",
				"java.io.File",
				"java.lang.Integer",
			}
		}
		if len(serializableClassesList) > 0 {
			args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClassesList, ","))
		}
	}
	// Automatically open modules for Java 11+
	openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar"
	if _, err := os.Stat(openModuleAgentJar); err == nil {
		args = append(args, "-javaagent:"+openModuleAgentJar)
	}
	args = append(args, "org.apache.beam.fn.harness.FnHarness")
	logger.Printf(ctx, "Executing: java %v", strings.Join(args, " "))

	logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...))
}