in cmd/worker/main.go [44:156]
func main() {
flag.Parse()
ctx := context.Background()
cfg, err := config.Init(ctx)
if err != nil {
log.Fatal(ctx, err)
}
cfg.Dump(os.Stdout)
if cfg.UseProfiler {
if err := profiler.Start(profiler.Config{}); err != nil {
log.Fatalf(ctx, "profiler.Start: %v", err)
}
}
db, err := cmdconfig.OpenDB(ctx, cfg, *bypassLicenseCheck)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
defer db.Close()
populateExcluded(ctx, db)
indexClient, err := index.New(cfg.IndexURL)
if err != nil {
log.Fatal(ctx, err)
}
proxyClient, err := proxy.New(cfg.ProxyURL)
if err != nil {
log.Fatal(ctx, err)
}
sourceClient := source.NewClient(config.SourceTimeout)
expg := cmdconfig.ExperimentGetter(ctx, cfg)
fetchQueue, err := queue.New(ctx, cfg, queueName, *workers, expg,
func(ctx context.Context, modulePath, version string) (int, error) {
f := &worker.Fetcher{
ProxyClient: proxyClient,
SourceClient: sourceClient,
DB: db,
}
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
return code, err
})
if err != nil {
log.Fatalf(ctx, "queue.New: %v", err)
}
reportingClient := cmdconfig.ReportingClient(ctx, cfg)
redisCacheClient := getCacheRedis(ctx, cfg)
redisBetaCacheClient := getBetaCacheRedis(ctx, cfg)
experimenter := cmdconfig.Experimenter(ctx, cfg, expg, reportingClient)
server, err := worker.NewServer(cfg, worker.ServerConfig{
DB: db,
IndexClient: indexClient,
ProxyClient: proxyClient,
SourceClient: sourceClient,
RedisCacheClient: redisCacheClient,
RedisBetaCacheClient: redisBetaCacheClient,
Queue: fetchQueue,
ReportingClient: reportingClient,
StaticPath: template.TrustedSourceFromFlag(flag.Lookup("static").Value),
GetExperiments: experimenter.Experiments,
})
if err != nil {
log.Fatal(ctx, err)
}
router := dcensus.NewRouter(nil)
server.Install(router.Handle)
views := append(dcensus.ServerViews,
worker.EnqueueResponseCount,
worker.ProcessingLag,
worker.UnprocessedModules,
worker.UnprocessedNewModules,
worker.DBProcesses,
worker.DBWaitingProcesses,
worker.SheddedFetchCount,
worker.FetchLatencyDistribution,
worker.FetchResponseCount,
worker.FetchPackageCount)
if err := dcensus.Init(cfg, views...); err != nil {
log.Fatal(ctx, err)
}
// We are not currently forwarding any ports on AppEngine, so serving debug
// information is broken.
if !cfg.OnAppEngine() {
dcensusServer, err := dcensus.NewServer()
if err != nil {
log.Fatal(ctx, err)
}
go http.ListenAndServe(cfg.DebugAddr("localhost:8001"), dcensusServer)
}
iap := middleware.Identity()
if aud := os.Getenv("GO_DISCOVERY_IAP_AUDIENCE"); aud != "" {
iap = middleware.ValidateIAPHeader(aud)
}
mw := middleware.Chain(
middleware.RequestLog(cmdconfig.Logger(ctx, cfg, "worker-log")),
middleware.Timeout(time.Duration(timeout)*time.Minute),
iap,
middleware.Experiment(experimenter),
)
http.Handle("/", mw(router))
addr := cfg.HostAddr("localhost:8000")
log.Infof(ctx, "Timeout is %d minutes", timeout)
log.Infof(ctx, "Listening on addr %s", addr)
log.Fatal(ctx, http.ListenAndServe(addr, nil))
}