func InflateDropAppend()

in pkg/updater/updater.go [570:748]


func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path, write bool, readCols ColumnReader, reprocess time.Duration) (bool, error) {
	log := alog.(logrus.Ext1FieldLogger) // Add trace method
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	// Grace period to read additional column.
	var grace context.Context
	if deadline, present := ctx.Deadline(); present {
		var cancel context.CancelFunc
		dur := time.Until(deadline) / 2
		grace, cancel = context.WithTimeout(context.Background(), dur)
		defer cancel()
	} else {
		grace = context.Background()
	}

	var shrinkGrace context.Context
	if deadline, present := ctx.Deadline(); present {
		var cancel context.CancelFunc
		dur := 3 * time.Until(deadline) / 4
		shrinkGrace, cancel = context.WithTimeout(context.Background(), dur)
		defer cancel()
	} else {
		shrinkGrace = context.Background()
	}

	var dur time.Duration
	if tg.DaysOfResults > 0 {
		dur = days(float64(tg.DaysOfResults))
	} else {
		dur = days(7)
	}

	stop := time.Now().Add(-dur)
	log = log.WithField("stop", stop)

	var oldCols []InflatedColumn
	var issues map[string][]string

	log.Trace("Downloading existing grid...")
	old, attrs, err := gcs.DownloadGrid(ctx, client, gridPath)
	if err != nil {
		log.WithField("path", gridPath).WithError(err).Error("Failed to download existing grid")
	}
	inflateStart := time.Now()
	if old != nil {
		var cols []InflatedColumn
		var err error
		log.Trace("Inflating grid...")
		if cols, issues, err = InflateGrid(ctx, old, stop, time.Now().Add(-reprocess)); err != nil {
			return false, fmt.Errorf("inflate: %w", err)
		}
		var floor time.Time
		when := time.Now().Add(-7 * 24 * time.Hour)
		if col := reprocessColumn(log, old, tg, when); col != nil {
			cols = append(cols, *col)
			floor = when
		}
		SortStarted(cols) // Our processing requires descending start time.
		oldCols = truncateRunning(cols, floor)
	}
	inflateDur := time.Since(inflateStart)
	readColsStart := time.Now()
	var cols []InflatedColumn
	var unreadColumns bool
	if attrs != nil && attrs.Size >= int64(byteCeiling) {
		log.WithField("size", attrs.Size).Info("Grid too large, compressing...")
		unreadColumns = true
		cols = oldCols
	} else {
		if condClient, ok := client.(gcs.ConditionalClient); ok {
			var cond storage.Conditions
			if attrs == nil {
				cond.DoesNotExist = true
			} else {
				cond.GenerationMatch = attrs.Generation
			}
			client = condClient.If(&cond, &cond)
		}

		newCols := make(chan InflatedColumn)
		ec := make(chan error)

		log.Trace("Reading first column...")
		go func() {
			err := readCols(ctx, log, tg, oldCols, stop, newCols)
			select {
			case <-ctx.Done():
			case ec <- err:
			}
		}()

		// Must read at least one column every cycle to ensure we make forward progress.
		more := true
		select {
		case <-ctx.Done():
			return false, fmt.Errorf("first column: %w", ctx.Err())
		case col := <-newCols:
			if len(col.Cells) == 0 {
				// Group all empty columns together by setting build/name empty.
				col.Column.Build = ""
				col.Column.Name = ""
			}
			cols = append(cols, col)
		case err := <-ec:
			if err != nil {
				return false, fmt.Errorf("read first column: %w", err)
			}
			more = false
		}

		// Read as many additional columns as we can within the allocated time.
		log.Trace("Reading additional columns...")
		for more {
			select {
			case <-grace.Done():
				unreadColumns = true
				more = false
			case <-ctx.Done():
				return false, ctx.Err()
			case col := <-newCols:
				if len(col.Cells) == 0 {
					// Group all empty columns together by setting build/name empty.
					col.Column.Build = ""
					col.Column.Name = ""
				}
				cols = append(cols, col)
			case err := <-ec:
				if err != nil {
					return false, fmt.Errorf("read columns: %w", err)
				}
				more = false
			}
		}

		log = log.WithField("appended", len(cols))

		overrideBuild(tg, cols) // so we group correctly
		cols = append(cols, oldCols...)
		cols = groupColumns(tg, cols)
	}
	readColsDur := time.Since(readColsStart)

	SortStarted(cols)

	shrinkStart := time.Now()
	cols = truncateGrid(cols, byteCeiling) // Assume each cell is at least 1 byte
	var grid *statepb.Grid
	var buf []byte
	grid, buf, err = shrinkGridInline(shrinkGrace, log, tg, cols, issues, byteCeiling)
	if err != nil {
		return false, fmt.Errorf("shrink grid inline: %v", err)
	}
	shrinkDur := time.Since(shrinkStart)

	grid.Config = tg

	log = log.WithField("url", gridPath).WithField("bytes", len(buf))
	if !write {
		log = log.WithField("dryrun", true)
	} else {
		log.Debug("Writing grid...")
		// TODO(fejta): configurable cache value
		if _, err := client.Upload(ctx, gridPath, buf, gcs.DefaultACL, gcs.NoCache); err != nil {
			return false, fmt.Errorf("upload %d bytes: %w", len(buf), err)
		}
	}
	if unreadColumns {
		log = log.WithField("more", true)
	}
	log.WithFields(logrus.Fields{
		"cols":     len(grid.Columns),
		"rows":     len(grid.Rows),
		"inflate":  inflateDur,
		"readCols": readColsDur,
		"shrink":   shrinkDur,
	}).Info("Wrote grid")
	return unreadColumns, nil
}