in sdks/java/container/boot.go [60:278]
func main() {
flag.Parse()
if *id == "" {
log.Fatal("No id provided.")
}
if *provisionEndpoint == "" {
log.Fatal("No provision endpoint provided.")
}
ctx := grpcx.WriteWorkerID(context.Background(), *id)
info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
if err != nil {
log.Fatalf("Failed to obtain provisioning information: %v", err)
}
log.Printf("Provision info:\n%v", info)
// TODO(BEAM-8201): Simplify once flags are no longer used.
if info.GetLoggingEndpoint().GetUrl() != "" {
*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
}
if info.GetArtifactEndpoint().GetUrl() != "" {
*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
}
if info.GetControlEndpoint().GetUrl() != "" {
*controlEndpoint = info.GetControlEndpoint().GetUrl()
}
if *loggingEndpoint == "" {
log.Fatal("No logging endpoint provided.")
}
if *artifactEndpoint == "" {
log.Fatal("No artifact endpoint provided.")
}
if *controlEndpoint == "" {
log.Fatal("No control endpoint provided.")
}
logger := &tools.Logger{Endpoint: *loggingEndpoint}
logger.Printf(ctx, "Initializing java harness: %v", strings.Join(os.Args, " "))
// (1) Obtain the pipeline options
options, err := tools.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
}
// (2) Retrieve the staged user jars. We ignore any disk limit,
// because the staged jars are mandatory.
// Using the SDK Harness ID in the artifact destination path to make sure that dependencies used by multiple
// SDK Harnesses in the same VM do not conflict. This is needed since some runners (for example, Dataflow)
// may share the artifact staging directory across multiple SDK Harnesses
// TODO(https://github.com/apache/beam/issues/20009): consider removing the SDK Harness ID from the staging path after Dataflow can properly
// seperate out dependencies per environment.
dir := filepath.Join(*semiPersistDir, *id, "staged")
artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
if err != nil {
logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err)
}
// (3) Invoke the Java harness, preserving artifact ordering in classpath.
os.Setenv("HARNESS_ID", *id)
if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)
}
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))
if info.GetStatusEndpoint() != nil {
os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", info.GetStatusEndpoint().String())
}
const jarsDir = "/opt/apache/beam/jars"
const javaHarnessJar = "beam-sdks-java-harness.jar"
defaultLoggingJars := []string{
"slf4j-api.jar",
"slf4j-jdk14.jar",
"jcl-over-slf4j.jar",
"log4j-over-slf4j.jar",
"log4j-to-slf4j.jar",
}
cp := []string{}
if strings.Contains(options, "use_custom_logging_libraries") {
// In this case, the logging libraries will be provided from the staged
// artifacts.
logger.Warnf(ctx, "Skipping default slf4j dependencies in classpath")
} else {
logger.Printf(ctx, "Using default slf4j dependencies in classpath")
for _, jar := range defaultLoggingJars {
cp = append(cp, filepath.Join(jarsDir, jar))
}
}
var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar")
if hasWorkerExperiment {
// Skip adding system "beam-sdks-java-harness.jar". User-provided jar will
// be added to classpath as a normal user jar further below.
logger.Printf(ctx, "Opted to use staged java harness. Make sure beam-sdks-java-harness is included or shaded in the staged jars.")
} else {
cp = append(cp, filepath.Join(jarsDir, javaHarnessJar))
}
for _, a := range artifacts {
name, _ := artifact.MustExtractFilePayload(a)
if hasWorkerExperiment {
if name == "dataflow-worker.jar" {
continue
}
}
cp = append(cp, filepath.Join(dir, filepath.FromSlash(name)))
}
var lim uint64
if strings.Contains(options, "set_recommended_max_xmx") {
lim = 32 << 30
} else {
size, err := syscallx.PhysicalMemorySize()
if err != nil {
size = 0
}
lim = HeapSizeLimit(size)
}
args := []string{
"-Xmx" + strconv.FormatUint(lim, 10),
// ParallelGC the most adequate for high throughput and lower CPU utilization
// It is the default GC in Java 8, but not on newer versions
"-XX:+UseParallelGC",
"-XX:+AlwaysActAsServerClassMachine",
"-XX:-OmitStackTraceInFastThrow",
}
enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
enableGoogleCloudHeapSampling := strings.Contains(options, enableGoogleCloudHeapSamplingOption)
if enableGoogleCloudProfiler {
if metadata := info.GetMetadata(); metadata != nil {
if jobName, nameExists := metadata["job_name"]; nameExists {
if jobId, idExists := metadata["job_id"]; idExists {
if enableGoogleCloudHeapSampling {
args = append(args, fmt.Sprintf(googleCloudProfilerAgentHeapArgs, jobName, jobId))
} else {
args = append(args, fmt.Sprintf(googleCloudProfilerAgentBaseArgs, jobName, jobId))
}
logger.Printf(ctx, "Turning on Cloud Profiling. Profile heap: %t", enableGoogleCloudHeapSampling)
} else {
logger.Printf(ctx, "Required job_id missing from metadata, profiling will not be enabled without it.")
}
} else {
logger.Printf(ctx, "Required job_name missing from metadata, profiling will not be enabled without it.")
}
} else {
logger.Printf(ctx, "enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled.")
}
}
disableJammAgent := strings.Contains(options, disableJammAgentOption)
if disableJammAgent {
logger.Printf(ctx, "Disabling Jamm agent. Measuring object size will be inaccurate.")
} else {
args = append(args, jammAgentArgs)
}
// Apply meta options
const metaDir = "/opt/apache/beam/options"
// Note: Error is unchecked, so parsing errors won't abort container.
// TODO: verify if it's intentional or not.
metaOptions, _ := LoadMetaOptions(ctx, logger, metaDir)
javaOptions := BuildOptions(ctx, logger, metaOptions)
// (1) Add custom jvm arguments: "-server -Xmx1324 -XXfoo .."
args = append(args, javaOptions.JavaArguments...)
// (2) Add classpath: "-cp foo.jar:bar.jar:.."
if len(javaOptions.Classpath) > 0 {
cp = append(cp, javaOptions.Classpath...)
}
pathingjar, err := makePathingJar(cp)
if err != nil {
logger.Fatalf(ctx, "makePathingJar failed: %v", err)
}
args = append(args, "-cp")
args = append(args, pathingjar)
// (3) Add (sorted) properties: "-Dbar=baz -Dfoo=bar .."
var properties []string
for key, value := range javaOptions.Properties {
properties = append(properties, fmt.Sprintf("-D%s=%s", key, value))
}
sort.Strings(properties)
args = append(args, properties...)
if pipelineOptions, ok := info.GetPipelineOptions().GetFields()["options"]; ok {
// Open modules specified in pipeline options
if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddOpenModules"]; ok {
for _, module := range modules.GetListValue().GetValues() {
args = append(args, "--add-opens="+module.GetStringValue())
}
}
// Add modules specified in pipeline options
if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddRootModules"]; ok {
for _, module := range modules.GetListValue().GetValues() {
args = append(args, "--add-modules="+module.GetStringValue())
}
}
}
// Automatically open modules for Java 11+
openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar"
if _, err := os.Stat(openModuleAgentJar); err == nil {
args = append(args, "-javaagent:"+openModuleAgentJar)
}
args = append(args, "org.apache.beam.fn.harness.FnHarness")
logger.Printf(ctx, "Executing: java %v", strings.Join(args, " "))
logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...))
}