func reportWriter()

in forwarder/main.go [123:154]


func reportWriter(ctx context.Context, ch chan *securityreport.SecurityReport) {
	t := time.NewTicker(interval)
	table := bt.Open(tableName)
	defer t.Stop()
	for {
		select {
		case <-t.C:
			size := min(len(ch), btWriteBufSize)
			// log.Printf("buffered %v reports", size)
			if size == 0 {
				// log.Printf("no reports buffered. skipping.")
				continue
			}
			muts := make([]*bigtable.Mutation, size)
			keys := make([]string, size)
			for i := 0; i < size; i++ {
				muts[i] = bigtable.NewMutation()
				r := <-ch
				setSecurityReport(muts[i], r)
				keys[i] = generateRowKey(r)
			}
			rowErrs, err := table.ApplyBulk(ctx, keys, muts)
			if err != nil {
				log.Printf("could not apply bulk row mutation: %v", err)
			}
			for _, rowErr := range rowErrs {
				log.Printf("error writing row: %v", rowErr)
			}
			log.Printf("wrote %v reports", size)
		}
	}
}