go/cmd/aggregate-known/aggregate-known.go (196 lines of code) (raw):

package main import ( "bufio" "context" "encoding/json" "flag" "os" "os/signal" "path/filepath" "sync" "syscall" "time" "github.com/golang/glog" "github.com/mozilla/crlite/go" "github.com/mozilla/crlite/go/config" "github.com/mozilla/crlite/go/engine" "github.com/mozilla/crlite/go/rootprogram" "github.com/mozilla/crlite/go/storage" ) const ( permMode = 0644 permModeDir = 0755 ) var ( enrolledpath = flag.String("enrolledpath", "<path>", "input enrolled issuers JSON") knownpath = flag.String("knownpath", "<dir>", "output directory for <issuer> files") ctlogspath = flag.String("ctlogspath", "<path>", "output file for ct-log JSON") ctconfig = config.NewCTConfig() ) type knownWorkUnit struct { issuer types.Issuer issuerDN string expDates []types.ExpDate } type knownWorker struct { savePath string certDB storage.CertDatabase } func (kw knownWorker) run(ctx context.Context, wg *sync.WaitGroup, workChan <-chan knownWorkUnit) { defer wg.Done() err := os.MkdirAll(kw.savePath, permModeDir) if err != nil && !os.IsExist(err) { glog.Fatalf("Could not make directory %s: %s", kw.savePath, err) } for tuple := range workChan { // Wrap in anonymous function to defer a writer.Flush & fd.Close per work unit func() { path := filepath.Join(kw.savePath, tuple.issuer.ID()) fd, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, permMode) if err != nil { glog.Fatalf("[%s] Could not open known certificates file: %s", tuple.issuer.ID(), err) } defer fd.Close() writer := bufio.NewWriter(fd) defer writer.Flush() var serialCount uint64 for _, expDate := range tuple.expDates { select { case <-ctx.Done(): glog.Warningf("Signal on worker quit channel, quitting (count=%d).", serialCount) return default: } if expDate.IsExpiredAt(time.Now()) { if glog.V(1) { glog.Warningf("Date %s is expired now, skipping (issuer=%s)", expDate, tuple.issuer.ID()) } continue } // Sharded by expiry date, so this should be fairly small. knownSet, err := kw.certDB.ReadSerialsFromStorage(expDate, tuple.issuer) if err != nil { glog.Fatalf("[%s] Could not read serials with expDate=%s: %s", tuple.issuer.ID(), expDate.ID(), err) } knownSetLen := uint64(len(knownSet)) if knownSetLen == 0 { // This is almost certainly due to an hour-rollover since the loader ran, and expired all the next hour's // certs. glog.Warningf("No cached certificates for issuer=%s (%s) expDate=%s, but the loader thought there should be."+ " (current count this worker=%d)", tuple.issuerDN, tuple.issuer.ID(), expDate, serialCount) } serialCount += knownSetLen err = storage.WriteSerialList(writer, expDate, tuple.issuer, knownSet) if err != nil { glog.Fatalf("[%s] Could not write serials: %s", tuple.issuer.ID(), err) } } glog.Infof("[%s] %d total known serials for %s (shards=%d)", tuple.issuer.ID(), serialCount, tuple.issuerDN, len(tuple.expDates)) }() select { case <-ctx.Done(): return default: } } } func checkPathArg(strObj string, confOptionName string, ctconfig *config.CTConfig) { if strObj == "<path>" { glog.Errorf("Flag %s is not set", confOptionName) ctconfig.Usage() os.Exit(2) } } func main() { ctconfig.Init() ctx := context.Background() ctx, cancelMain := context.WithCancel(ctx) // Try to handle SIGINT and SIGTERM gracefully sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) defer close(sigChan) go func() { sig := <-sigChan glog.Infof("Signal caught: %s..", sig) cancelMain() signal.Stop(sigChan) // Restore default behavior }() certDB, cache := engine.GetConfiguredStorage(ctx, ctconfig, false) defer glog.Flush() checkPathArg(*enrolledpath, "enrolledpath", ctconfig) checkPathArg(*knownpath, "knownpath", ctconfig) checkPathArg(*ctlogspath, "ctlogspath", ctconfig) if err := os.MkdirAll(*knownpath, permModeDir); err != nil { glog.Fatalf("Unable to make the output directory: %s", err) } engine.PrepareTelemetry("aggregate-known", ctconfig) mozIssuers := rootprogram.NewMozillaIssuers() if err := mozIssuers.LoadEnrolledIssuers(*enrolledpath); err != nil { glog.Fatalf("Failed to load enrolled issuers from disk: %s", err) } glog.Infof("%d issuers loaded", len(mozIssuers.GetIssuers())) glog.Infof("Committing DB changes since last run") commitToken, err := cache.AcquireCommitLock() if err != nil || commitToken == nil { glog.Fatalf("Failed to acquire commit lock: %s", err) } defer cache.ReleaseCommitLock(*commitToken) err = certDB.Commit(*commitToken) if err != nil { glog.Fatalf("Error in commit: %s", err) } logList, err := certDB.GetCTLogsFromStorage() if err != nil { glog.Fatalf("Error reading coverage metadata: %s", err) } ctLogFD, err := os.OpenFile(*ctlogspath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { glog.Fatalf("Error opening %s: %s", *ctlogspath, err) } enc := json.NewEncoder(ctLogFD) if err := enc.Encode(logList); err != nil { glog.Fatalf("Error marshaling ct-logs list %s: %s", *ctlogspath, err) } ctLogFD.Close() glog.Infof("Listing issuers and their expiration dates...") issuerList, err := certDB.GetIssuerAndDatesFromStorage() if err != nil { glog.Fatal(err) } var count int64 for _, iObj := range issuerList { if mozIssuers.IsIssuerInProgram(iObj.Issuer) { count = count + int64(len(iObj.ExpDates)) } } workChan := make(chan knownWorkUnit, count) for _, iObj := range issuerList { if !mozIssuers.IsIssuerInProgram(iObj.Issuer) { continue } issuerSubj, err := mozIssuers.GetSubjectForIssuer(iObj.Issuer) if err != nil { glog.Warningf("Couldn't get subject for issuer=%s that is in the root program: %s", iObj.Issuer.ID(), err) issuerSubj = "<unknown>" } wu := knownWorkUnit{ issuer: iObj.Issuer, issuerDN: issuerSubj, expDates: iObj.ExpDates, } select { case workChan <- wu: default: glog.Fatalf("Channel overflow. Aborting at %+v", wu) } } // Signal that was the last work close(workChan) glog.Infof("Starting worker processes to handle %d work units", count) var wg sync.WaitGroup // Start the workers for t := 0; t < *ctconfig.NumThreads; t++ { wg.Add(1) worker := knownWorker{ savePath: *knownpath, certDB: certDB, } go worker.run(ctx, &wg, workChan) } wg.Wait() }