in pkg/updater/updater.go [570:748]
func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path, write bool, readCols ColumnReader, reprocess time.Duration) (bool, error) {
log := alog.(logrus.Ext1FieldLogger) // Add trace method
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Grace period to read additional column.
var grace context.Context
if deadline, present := ctx.Deadline(); present {
var cancel context.CancelFunc
dur := time.Until(deadline) / 2
grace, cancel = context.WithTimeout(context.Background(), dur)
defer cancel()
} else {
grace = context.Background()
}
var shrinkGrace context.Context
if deadline, present := ctx.Deadline(); present {
var cancel context.CancelFunc
dur := 3 * time.Until(deadline) / 4
shrinkGrace, cancel = context.WithTimeout(context.Background(), dur)
defer cancel()
} else {
shrinkGrace = context.Background()
}
var dur time.Duration
if tg.DaysOfResults > 0 {
dur = days(float64(tg.DaysOfResults))
} else {
dur = days(7)
}
stop := time.Now().Add(-dur)
log = log.WithField("stop", stop)
var oldCols []InflatedColumn
var issues map[string][]string
log.Trace("Downloading existing grid...")
old, attrs, err := gcs.DownloadGrid(ctx, client, gridPath)
if err != nil {
log.WithField("path", gridPath).WithError(err).Error("Failed to download existing grid")
}
inflateStart := time.Now()
if old != nil {
var cols []InflatedColumn
var err error
log.Trace("Inflating grid...")
if cols, issues, err = InflateGrid(ctx, old, stop, time.Now().Add(-reprocess)); err != nil {
return false, fmt.Errorf("inflate: %w", err)
}
var floor time.Time
when := time.Now().Add(-7 * 24 * time.Hour)
if col := reprocessColumn(log, old, tg, when); col != nil {
cols = append(cols, *col)
floor = when
}
SortStarted(cols) // Our processing requires descending start time.
oldCols = truncateRunning(cols, floor)
}
inflateDur := time.Since(inflateStart)
readColsStart := time.Now()
var cols []InflatedColumn
var unreadColumns bool
if attrs != nil && attrs.Size >= int64(byteCeiling) {
log.WithField("size", attrs.Size).Info("Grid too large, compressing...")
unreadColumns = true
cols = oldCols
} else {
if condClient, ok := client.(gcs.ConditionalClient); ok {
var cond storage.Conditions
if attrs == nil {
cond.DoesNotExist = true
} else {
cond.GenerationMatch = attrs.Generation
}
client = condClient.If(&cond, &cond)
}
newCols := make(chan InflatedColumn)
ec := make(chan error)
log.Trace("Reading first column...")
go func() {
err := readCols(ctx, log, tg, oldCols, stop, newCols)
select {
case <-ctx.Done():
case ec <- err:
}
}()
// Must read at least one column every cycle to ensure we make forward progress.
more := true
select {
case <-ctx.Done():
return false, fmt.Errorf("first column: %w", ctx.Err())
case col := <-newCols:
if len(col.Cells) == 0 {
// Group all empty columns together by setting build/name empty.
col.Column.Build = ""
col.Column.Name = ""
}
cols = append(cols, col)
case err := <-ec:
if err != nil {
return false, fmt.Errorf("read first column: %w", err)
}
more = false
}
// Read as many additional columns as we can within the allocated time.
log.Trace("Reading additional columns...")
for more {
select {
case <-grace.Done():
unreadColumns = true
more = false
case <-ctx.Done():
return false, ctx.Err()
case col := <-newCols:
if len(col.Cells) == 0 {
// Group all empty columns together by setting build/name empty.
col.Column.Build = ""
col.Column.Name = ""
}
cols = append(cols, col)
case err := <-ec:
if err != nil {
return false, fmt.Errorf("read columns: %w", err)
}
more = false
}
}
log = log.WithField("appended", len(cols))
overrideBuild(tg, cols) // so we group correctly
cols = append(cols, oldCols...)
cols = groupColumns(tg, cols)
}
readColsDur := time.Since(readColsStart)
SortStarted(cols)
shrinkStart := time.Now()
cols = truncateGrid(cols, byteCeiling) // Assume each cell is at least 1 byte
var grid *statepb.Grid
var buf []byte
grid, buf, err = shrinkGridInline(shrinkGrace, log, tg, cols, issues, byteCeiling)
if err != nil {
return false, fmt.Errorf("shrink grid inline: %v", err)
}
shrinkDur := time.Since(shrinkStart)
grid.Config = tg
log = log.WithField("url", gridPath).WithField("bytes", len(buf))
if !write {
log = log.WithField("dryrun", true)
} else {
log.Debug("Writing grid...")
// TODO(fejta): configurable cache value
if _, err := client.Upload(ctx, gridPath, buf, gcs.DefaultACL, gcs.NoCache); err != nil {
return false, fmt.Errorf("upload %d bytes: %w", len(buf), err)
}
}
if unreadColumns {
log = log.WithField("more", true)
}
log.WithFields(logrus.Fields{
"cols": len(grid.Columns),
"rows": len(grid.Rows),
"inflate": inflateDur,
"readCols": readColsDur,
"shrink": shrinkDur,
}).Info("Wrote grid")
return unreadColumns, nil
}