main.go (74 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package main import ( "time" "github.com/uber/storagetapper/changelog" "github.com/uber/storagetapper/config" "github.com/uber/storagetapper/db" "github.com/uber/storagetapper/encoder" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/metrics" "github.com/uber/storagetapper/pipe" "github.com/uber/storagetapper/pool" "github.com/uber/storagetapper/server" "github.com/uber/storagetapper/shutdown" "github.com/uber/storagetapper/state" "github.com/uber/storagetapper/streamer" "github.com/uber/storagetapper/types" "golang.org/x/net/context" ) var version, revision = "1.0", "" func idle(interval time.Duration) { metrics.IdleWorkers.Inc() defer metrics.IdleWorkers.Dec() log.Debugf("going idle") select { case <-shutdown.InitiatedCh(): case <-time.After(interval): } } func worker(ctx context.Context, cfg *config.AppConfig, inP pipe.Pipe, tpool pool.Thread) { log.Debugf("Started worker thread, Total: %+v", shutdown.NumProcs()+1) for !shutdown.Initiated() && !tpool.Terminate() { if !changelog.Worker(ctx, cfg, inP, tpool) && !streamer.Worker(cfg, inP) { idle(cfg.WorkerIdleInterval) } } log.Debugf("Finished worker thread. Threads remaining %v", shutdown.NumProcs()+1) } // mainLow extracted for the tests to be able to run with different configurations func mainLow(cfg *config.AppConfig) { env := config.Environment() log.Configure(cfg.LogType, cfg.LogLevel, env == config.EnvProduction || env == config.EnvStaging) log.Infof("%s Version: %s, Revision: %s", types.MySvcName, version, revision) log.Debugf("Config: %+v", cfg) types.MyDBName = cfg.StateDBName types.MyClusterName = cfg.StateClusterName err := metrics.Init() log.F(err) shutdown.Setup() //This is to resolve package cyclic dependencies encoder.GetLatestSchema = state.SchemaGet db.BuiltinResolveCluster = state.ConnectInfoGet if err := state.InitManager(shutdown.Context, cfg); err != nil { log.F(err) } go server.StartHTTPServer(cfg.PortDyn) nprocs := uint(cfg.MaxNumProcs) if cfg.ChangelogPipeType == "local" { nprocs = 1 /*Start changelog reader only, it'll control the size of the thread pool*/ } //Increasing batch size is important to prevent Pipes from preserving //offsets before batch of the size batch_size has been committed to the //output pipe //There may be total 2*batch_size+1 events in flight // * batch_size - in outP batch buffer // * batch_size - in streamer helper channel buffer // * +1 - waiting in streamer helper to be pushed when buffer is full p := cfg.Pipe p.MaxBatchSize = 2*p.MaxBatchSize + 1 inP, err := pipe.Create(cfg.ChangelogPipeType, &p, state.GetDB()) log.F(err) tp := pool.Create() tp.Start(nprocs, func() { worker(shutdown.Context, cfg, inP, tp) }) shutdown.Wait() server.Shutdown() pipe.CacheDestroy() log.Debugf("FINISHED") } func main() { cfg := config.Get() mainLow(cfg) }