oracle/cmd/pitr_agent/pitr_agent.go (93 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "flag" "fmt" "net" "os" "strings" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/common" dbdpb "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/oracle" "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/pitr" pb "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/pitr/proto" pitrServer "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/pitr/server" "google.golang.org/grpc" "k8s.io/klog/v2" ) var port = flag.Int("port", 0, "The tcp port of a PITR Agent server.") var dbservice = flag.String("dbservice", "", "The DB service.") var dbport = flag.Int("dbport", 0, "The DB service port.") var dest = flag.String("dest", "", "The dest url to the replication destination location") var retentionDays = flag.Int("retentiondays", 7, "how long(in days) PITR need to retain redo logs") func main() { klog.InitFlags(nil) flag.Parse() if !strings.HasPrefix(*dest, "gs://") { klog.Error("invalid dest url for replication, only support GCS in the current release", "dest", *dest) os.Exit(1) } lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { klog.ErrorS(err, "PITR Agent failed to listen") os.Exit(1) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() conn, err := common.DatabaseDaemonDialService(ctx, fmt.Sprintf("%s:%d", *dbservice, *dbport), grpc.WithBlock()) if err != nil { klog.ErrorS(err, "PITR Agent failed to connect to dbdaemon") os.Exit(1) } defer conn.Close() dbdClient := dbdpb.NewDatabaseDaemonClient(conn) if err := pitr.SetArchiveLag(ctx, dbdClient); err != nil { klog.ErrorS(err, "failed to set the archive lag parameter") os.Exit(1) } mDir := *dest if !strings.HasSuffix(*dest, "/") { mDir = *dest + "/" } hashStore, err := pitr.NewSimpleStore(ctx, mDir+"hash/") if err != nil { klog.ErrorS(err, "failed to create hash store") os.Exit(1) } defer hashStore.Close(ctx) metadataStore, err := pitr.NewSimpleStore(ctx, mDir) if err != nil { klog.ErrorS(err, "failed to create metadata store") os.Exit(1) } defer metadataStore.Close(ctx) go func() { if err := pitr.RunLogReplication(ctx, dbdClient, *dest, hashStore); err != nil { klog.Error(err, "failed to start log replication") } cancel() }() go func() { if err := pitr.RunMetadataUpdate(ctx, dbdClient, hashStore, metadataStore); err != nil { klog.Error(err, "failed to start metadata update") } cancel() }() go func() { if err := pitr.RunLogRetention(ctx, *retentionDays, metadataStore, hashStore); err != nil { klog.Error(err, "failed to start log retention") } cancel() }() grpcSvr := grpc.NewServer() pb.RegisterPITRAgentServer(grpcSvr, &pitrServer.PITRServer{DBService: *dbservice, DBPort: *dbport, MetadataStore: metadataStore}) go func() { klog.InfoS("Starting PITR Agent", "port", *port) if err := grpcSvr.Serve(lis); err != nil { klog.ErrorS(err, "PITR Agent failed to start") } cancel() }() <-ctx.Done() klog.Info("Exiting PITR agent") grpcSvr.Stop() }