in forwarder/main.go [123:154]
func reportWriter(ctx context.Context, ch chan *securityreport.SecurityReport) {
t := time.NewTicker(interval)
table := bt.Open(tableName)
defer t.Stop()
for {
select {
case <-t.C:
size := min(len(ch), btWriteBufSize)
// log.Printf("buffered %v reports", size)
if size == 0 {
// log.Printf("no reports buffered. skipping.")
continue
}
muts := make([]*bigtable.Mutation, size)
keys := make([]string, size)
for i := 0; i < size; i++ {
muts[i] = bigtable.NewMutation()
r := <-ch
setSecurityReport(muts[i], r)
keys[i] = generateRowKey(r)
}
rowErrs, err := table.ApplyBulk(ctx, keys, muts)
if err != nil {
log.Printf("could not apply bulk row mutation: %v", err)
}
for _, rowErr := range rowErrs {
log.Printf("error writing row: %v", rowErr)
}
log.Printf("wrote %v reports", size)
}
}
}