gateway/gateway.go (130 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package gateway implements a REST gateway for the emulator GRPC service.
package gateway
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"time"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/grpc"
instancepb "cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
lrgw "cloud_spanner_emulator/gateway/longrunning_operations_gateway"
dagw "cloud_spanner_emulator/gateway/spanner_admin_database_gateway"
spgw "cloud_spanner_emulator/gateway/spanner_gateway"
iagw "cloud_spanner_emulator/gateway/spanner_admin_instance_gateway"
)
// Options encapsulates options for the emulator gateway.
type Options struct {
GatewayAddress string
FrontendBinary string
FrontendAddress string
CopyEmulatorStdout bool
CopyEmulatorStderr bool
LogRequests bool
EnableFaultInjection bool
DisableQueryNullFilteredIndexCheck bool
OverrideMaxDatabasesPerInstance int
OverrideChangeStreamPartitionTokenAliveSeconds int
}
// Gateway implements the emulator gateway server.
type Gateway struct {
opts Options
}
// New returns a new gateway.
func New(opts Options) *Gateway {
return &Gateway{opts}
}
// Run starts the emulator gateway server.
func (gw *Gateway) Run() {
// Start the emulator grpc server and redirect its output.
emulatorArgs := []string{
"--host_port", gw.opts.FrontendAddress,
}
if gw.opts.LogRequests {
emulatorArgs = append(emulatorArgs, "--log_requests")
}
if gw.opts.EnableFaultInjection {
emulatorArgs = append(emulatorArgs, "--enable_fault_injection")
}
if gw.opts.DisableQueryNullFilteredIndexCheck {
emulatorArgs = append(emulatorArgs, "--disable_query_null_filtered_index_check")
}
emulatorArgs = append(emulatorArgs,
fmt.Sprintf("--override_max_databases_per_instance=%d",
gw.opts.OverrideMaxDatabasesPerInstance))
emulatorArgs = append(emulatorArgs,
fmt.Sprintf("--override_change_stream_partition_token_alive_seconds=%d",
gw.opts.OverrideChangeStreamPartitionTokenAliveSeconds))
cmd := exec.Command(gw.opts.FrontendBinary, emulatorArgs...)
// Proxy emulator log to gateway log.
if gw.opts.CopyEmulatorStdout {
cmd.Stdout = os.Stdout
}
if gw.opts.CopyEmulatorStderr {
cmd.Stderr = os.Stderr
}
// Start the grpc server but won't block for the grpc server to be up.
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
// Terminate the grpc server if the gateway server is terminated.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
// Release resources e.g., network ports associated with the process.
// This is required since gateway may receive an interrupt signal for
// shutdown before Wait() returns.
cmd.Process.Release()
cmd.Process.Kill()
os.Exit(0)
}()
// Terminate the gateway server if the grpc server is terminated.
go func() {
cmd.Wait()
log.Println("Shutting down gateway server since grpc server is terminated.")
os.Exit(cmd.ProcessState.ExitCode())
}()
// Wait for the grpc server to be up.
ctx := context.Background()
addr := gw.opts.FrontendAddress
if err = waitForReady(ctx, addr); err != nil {
log.Fatal(fmt.Errorf("Error waiting for emulator to start: %v", err))
}
// Setup the gateway services.
mux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{}))
opts := []grpc.DialOption{grpc.WithInsecure()}
err = spgw.RegisterSpannerHandlerFromEndpoint(ctx, mux, addr, opts)
if err != nil {
log.Fatal(err)
}
err = iagw.RegisterInstanceAdminHandlerFromEndpoint(ctx, mux, addr, opts)
if err != nil {
log.Fatal(err)
}
err = dagw.RegisterDatabaseAdminHandlerFromEndpoint(ctx, mux, addr, opts)
if err != nil {
log.Fatal(err)
}
err = lrgw.RegisterOperationsHandlerFromEndpoint(ctx, mux, addr, opts)
if err != nil {
log.Fatal(err)
}
// Start the gateway http server.
log.Println("Cloud Spanner emulator running.")
log.Println("REST server listening at", gw.opts.GatewayAddress)
log.Println("gRPC server listening at", gw.opts.FrontendAddress)
err = http.ListenAndServe(gw.opts.GatewayAddress, mux)
if err != nil {
log.Fatal(err)
}
}
func waitForReady(ctx context.Context, endpoint string) error {
timeout := 30 * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
conn, err := grpc.Dial(endpoint, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return err
}
defer conn.Close()
// To test whether the server is up, wait for ListInstanceConfigs to respond
// for a dummy project.
instanceAdminClient := instancepb.NewInstanceAdminClient(conn)
if _, err = instanceAdminClient.ListInstanceConfigs(ctx,
&instancepb.ListInstanceConfigsRequest{
Parent: "projects/test-project",
}); err != nil {
return fmt.Errorf("emulator failed to come up at %v within %v deadline: %v",
endpoint, timeout.String(), err)
}
return nil
}