sdks/go/container/pool/workerpool.go (106 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package pool facilitates a external worker service, as an alternate mode for
// the standard Beam container.
//
// This is predeominantly to serve as a process spawner within a given container
// VM for an arbitrary number of jobs, instead of for a single worker instance.
//
// Workers will be spawned as executed OS processes.
package pool
import (
"context"
"fmt"
"log/slog"
"net"
"os"
"os/exec"
"sync"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"google.golang.org/grpc"
)
// New initializes a process based ExternalWorkerService, at the given
// port.
func New(ctx context.Context, port int, containerExecutable string) (*Process, error) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return nil, err
}
slog.Info("starting Process server", "addr", lis.Addr())
grpcServer := grpc.NewServer()
root, cancel := context.WithCancel(ctx)
s := &Process{lis: lis, root: root, rootCancel: cancel, workers: map[string]context.CancelFunc{},
grpcServer: grpcServer, containerExecutable: containerExecutable}
fnpb.RegisterBeamFnExternalWorkerPoolServer(grpcServer, s)
return s, nil
}
// ServeAndWait starts the ExternalWorkerService and blocks until exit.
func (s *Process) ServeAndWait() error {
return s.grpcServer.Serve(s.lis)
}
// Process implements fnpb.BeamFnExternalWorkerPoolServer, by starting external
// processes.
type Process struct {
fnpb.UnimplementedBeamFnExternalWorkerPoolServer
containerExecutable string // The host for the container executable.
lis net.Listener
root context.Context
rootCancel context.CancelFunc
mu sync.Mutex
workers map[string]context.CancelFunc
grpcServer *grpc.Server
}
// StartWorker initializes a new worker harness, implementing BeamFnExternalWorkerPoolServer.StartWorker.
func (s *Process) StartWorker(_ context.Context, req *fnpb.StartWorkerRequest) (*fnpb.StartWorkerResponse, error) {
slog.Info("starting worker", "id", req.GetWorkerId())
s.mu.Lock()
defer s.mu.Unlock()
if s.workers == nil {
return &fnpb.StartWorkerResponse{
Error: "worker pool shutting down",
}, nil
}
if _, ok := s.workers[req.GetWorkerId()]; ok {
return &fnpb.StartWorkerResponse{
Error: fmt.Sprintf("worker with ID %q already exists", req.GetWorkerId()),
}, nil
}
if req.GetLoggingEndpoint() == nil {
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Missing logging endpoint for worker %v", req.GetWorkerId())}, nil
}
if req.GetControlEndpoint() == nil {
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Missing control endpoint for worker %v", req.GetWorkerId())}, nil
}
if req.GetLoggingEndpoint().Authentication != nil || req.GetControlEndpoint().Authentication != nil {
return &fnpb.StartWorkerResponse{Error: "[BEAM-10610] Secure endpoints not supported."}, nil
}
ctx := grpcx.WriteWorkerID(s.root, req.GetWorkerId())
ctx, s.workers[req.GetWorkerId()] = context.WithCancel(ctx)
args := []string{
"--id=" + req.GetWorkerId(),
"--control_endpoint=" + req.GetControlEndpoint().GetUrl(),
"--artifact_endpoint=" + req.GetArtifactEndpoint().GetUrl(),
"--provision_endpoint=" + req.GetProvisionEndpoint().GetUrl(),
"--logging_endpoint=" + req.GetLoggingEndpoint().GetUrl(),
}
cmd := exec.CommandContext(ctx, s.containerExecutable, args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = nil // Use the current environment.
if err := cmd.Start(); err != nil {
return &fnpb.StartWorkerResponse{Error: fmt.Sprintf("Unable to start boot for worker %v: %v", req.GetWorkerId(), err)}, nil
}
return &fnpb.StartWorkerResponse{}, nil
}
// StopWorker terminates a worker harness, implementing BeamFnExternalWorkerPoolServer.StopWorker.
func (s *Process) StopWorker(_ context.Context, req *fnpb.StopWorkerRequest) (*fnpb.StopWorkerResponse, error) {
slog.Info("stopping worker", "id", req.GetWorkerId())
s.mu.Lock()
defer s.mu.Unlock()
if s.workers == nil {
// Worker pool is already shutting down, so no action is needed.
return &fnpb.StopWorkerResponse{}, nil
}
if cancelfn, ok := s.workers[req.GetWorkerId()]; ok {
cancelfn()
delete(s.workers, req.GetWorkerId())
return &fnpb.StopWorkerResponse{}, nil
}
return &fnpb.StopWorkerResponse{
Error: fmt.Sprintf("no worker with id %q running", req.GetWorkerId()),
}, nil
}
// Stop terminates the service and stops all workers.
func (s *Process) Stop(ctx context.Context) error {
s.mu.Lock()
slog.Debug("stopping Process", "worker_count", len(s.workers))
s.workers = nil
s.rootCancel()
// There can be a deadlock between the StopWorker RPC and GracefulStop
// which waits for all RPCs to finish, so it must be outside the critical section.
s.mu.Unlock()
s.grpcServer.GracefulStop()
return nil
}