in go/cmd/aggregate-known/aggregate-known.go [46:114]
func (kw knownWorker) run(ctx context.Context, wg *sync.WaitGroup, workChan <-chan knownWorkUnit) {
defer wg.Done()
err := os.MkdirAll(kw.savePath, permModeDir)
if err != nil && !os.IsExist(err) {
glog.Fatalf("Could not make directory %s: %s", kw.savePath, err)
}
for tuple := range workChan {
// Wrap in anonymous function to defer a writer.Flush & fd.Close per work unit
func() {
path := filepath.Join(kw.savePath, tuple.issuer.ID())
fd, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, permMode)
if err != nil {
glog.Fatalf("[%s] Could not open known certificates file: %s", tuple.issuer.ID(), err)
}
defer fd.Close()
writer := bufio.NewWriter(fd)
defer writer.Flush()
var serialCount uint64
for _, expDate := range tuple.expDates {
select {
case <-ctx.Done():
glog.Warningf("Signal on worker quit channel, quitting (count=%d).", serialCount)
return
default:
}
if expDate.IsExpiredAt(time.Now()) {
if glog.V(1) {
glog.Warningf("Date %s is expired now, skipping (issuer=%s)", expDate, tuple.issuer.ID())
}
continue
}
// Sharded by expiry date, so this should be fairly small.
knownSet, err := kw.certDB.ReadSerialsFromStorage(expDate, tuple.issuer)
if err != nil {
glog.Fatalf("[%s] Could not read serials with expDate=%s: %s", tuple.issuer.ID(), expDate.ID(), err)
}
knownSetLen := uint64(len(knownSet))
if knownSetLen == 0 {
// This is almost certainly due to an hour-rollover since the loader ran, and expired all the next hour's
// certs.
glog.Warningf("No cached certificates for issuer=%s (%s) expDate=%s, but the loader thought there should be."+
" (current count this worker=%d)", tuple.issuerDN, tuple.issuer.ID(), expDate, serialCount)
}
serialCount += knownSetLen
err = storage.WriteSerialList(writer, expDate, tuple.issuer, knownSet)
if err != nil {
glog.Fatalf("[%s] Could not write serials: %s", tuple.issuer.ID(), err)
}
}
glog.Infof("[%s] %d total known serials for %s (shards=%d)", tuple.issuer.ID(),
serialCount, tuple.issuerDN, len(tuple.expDates))
}()
select {
case <-ctx.Done():
return
default:
}
}
}