internal/helper/lines/send.go (96 lines of code) (raw):

package lines import ( "bufio" "bytes" "errors" "io" "regexp" ) // SenderOpts contains fields that Send() uses to determine what is considered // a line, and how to handle pagination. That is, how many lines to skip, before // a line gets fed into the Sender. type SenderOpts struct { // Delimiter is the separator used to split the sender's output into // lines. Defaults to an empty byte (0). Delimiter byte // Limit is the upper limit of how many lines will be sent. The zero // value will cause no lines to be sent. Limit int // IsPageToken allows control over which results are sent as part of the // response. When IsPageToken evaluates to true for the first time, // results will start to be sent as part of the response. This function // will be called with an empty slice previous to sending the first line // in order to allow sending everything right from the beginning. IsPageToken func([]byte) bool // When PageTokenError is true than Sender will return an error when provided // PageToken is not found. PageTokenError bool // Filter limits sent results to those that pass the filter. The zero // value (nil) disables filtering. Filter *regexp.Regexp } var ( // ItemsPerMessage establishes the threshold to flush the buffer when using the // `Send` function. It's a variable instead of a constant to make it possible to // override in tests. ItemsPerMessage = 20 // ErrInvalidPageToken represents an error when the provided page token is invalid ErrInvalidPageToken = errors.New("could not find page token") ) // Sender handles a buffer of lines from a Git command type Sender func([][]byte) error type writer struct { sender Sender lines [][]byte options SenderOpts } // CopyAndAppend adds a newly allocated copy of `e` to the `s` slice. Useful to // avoid io buffer shennanigans func CopyAndAppend(s [][]byte, e []byte) [][]byte { line := make([]byte, len(e)) copy(line, e) return append(s, line) } // flush calls the `sender` handler function with the accumulated lines and // clears the lines buffer. func (w *writer) flush() error { if len(w.lines) == 0 { // No message to send, just return return nil } if err := w.sender(w.lines); err != nil { return err } // Reset the message w.lines = nil return nil } // addLine adds a new line to the writer buffer, and flushes if the maximum // size has been achieved func (w *writer) addLine(p []byte) error { w.lines = CopyAndAppend(w.lines, p) if len(w.lines) >= ItemsPerMessage { return w.flush() } return nil } // consume reads from an `io.Reader` and writes each line to the buffer. It // flushes after being done reading. func (w *writer) consume(r io.Reader) error { buf := bufio.NewReader(r) // As `IsPageToken` will instruct us to send the _next_ line only, we // need to call it before the first iteration to allow for the case // where we want to send right from the beginning. pastPageToken := w.options.IsPageToken([]byte{}) for i := 0; i < w.options.Limit; { var line []byte for { // delim can be multiple bytes, so we read till the end byte of it ... chunk, err := buf.ReadBytes(w.delimiter()) if err != nil && !errors.Is(err, io.EOF) { return err } line = append(line, chunk...) // ... then we check if the last bytes of line are the same as delim if bytes.HasSuffix(line, []byte{w.delimiter()}) { break } if errors.Is(err, io.EOF) { i = w.options.Limit // Implicit exit clause for the loop break } } line = bytes.TrimSuffix(line, []byte{w.delimiter()}) if len(line) == 0 { break } // If a page token is given, we need to skip all lines until we've found it. // All remaining lines will then be sent until we reach the pagination limit. if !pastPageToken { pastPageToken = w.options.IsPageToken(line) continue } if w.filter() != nil && !w.filter().Match(line) { continue } i++ // Only increment the counter if the result wasn't skipped if err := w.addLine(line); err != nil { return err } } if !pastPageToken && w.options.PageTokenError { return ErrInvalidPageToken } return w.flush() } func (w *writer) delimiter() byte { return w.options.Delimiter } func (w *writer) filter() *regexp.Regexp { return w.options.Filter } // Send reads output from `r`, splits it at `opts.Delimiter`, then handles the // buffered lines using `sender`. func Send(r io.Reader, sender Sender, opts SenderOpts) error { if opts.IsPageToken == nil { opts.IsPageToken = func(_ []byte) bool { return true } } writer := &writer{sender: sender, options: opts} return writer.consume(r) }