func()

in go/cmd/aggregate-known/aggregate-known.go [46:114]


func (kw knownWorker) run(ctx context.Context, wg *sync.WaitGroup, workChan <-chan knownWorkUnit) {
	defer wg.Done()

	err := os.MkdirAll(kw.savePath, permModeDir)
	if err != nil && !os.IsExist(err) {
		glog.Fatalf("Could not make directory %s: %s", kw.savePath, err)
	}

	for tuple := range workChan {
		// Wrap in anonymous function to defer a writer.Flush & fd.Close per work unit
		func() {
			path := filepath.Join(kw.savePath, tuple.issuer.ID())
			fd, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, permMode)
			if err != nil {
				glog.Fatalf("[%s] Could not open known certificates file: %s", tuple.issuer.ID(), err)
			}
			defer fd.Close()

			writer := bufio.NewWriter(fd)
			defer writer.Flush()

			var serialCount uint64

			for _, expDate := range tuple.expDates {
				select {
				case <-ctx.Done():
					glog.Warningf("Signal on worker quit channel, quitting (count=%d).", serialCount)
					return
				default:
				}

				if expDate.IsExpiredAt(time.Now()) {
					if glog.V(1) {
						glog.Warningf("Date %s is expired now, skipping (issuer=%s)", expDate, tuple.issuer.ID())
					}
					continue
				}

				// Sharded by expiry date, so this should be fairly small.
				knownSet, err := kw.certDB.ReadSerialsFromStorage(expDate, tuple.issuer)
				if err != nil {
					glog.Fatalf("[%s] Could not read serials with expDate=%s: %s", tuple.issuer.ID(), expDate.ID(), err)
				}
				knownSetLen := uint64(len(knownSet))

				if knownSetLen == 0 {
					// This is almost certainly due to an hour-rollover since the loader ran, and expired all the next hour's
					// certs.
					glog.Warningf("No cached certificates for issuer=%s (%s) expDate=%s, but the loader thought there should be."+
						" (current count this worker=%d)", tuple.issuerDN, tuple.issuer.ID(), expDate, serialCount)
				}

				serialCount += knownSetLen
				err = storage.WriteSerialList(writer, expDate, tuple.issuer, knownSet)
				if err != nil {
					glog.Fatalf("[%s] Could not write serials: %s", tuple.issuer.ID(), err)
				}
			}
			glog.Infof("[%s] %d total known serials for %s (shards=%d)", tuple.issuer.ID(),
				serialCount, tuple.issuerDN, len(tuple.expDates))
		}()

		select {
		case <-ctx.Done():
			return
		default:
		}
	}
}