azkustodata/query/v2/frame_reader.go (64 lines of code) (raw):
package v2
import (
"bufio"
"context"
"github.com/Azure/azure-kusto-go/azkustodata/errors"
"io"
)
// frameReader reads frame-by-frame from a Kusto response.
// It is specifically designed for the FragmentedV2 protocol:
// 1. The response is a JSON array of frames, but separated by newlines.
// 2. We convert it into a proper JsonLines format by stripping the first byte of each line.
// 3. We then read each line as a separate frame.
// 4. When we reach the end of the array, it means we have reached the end of the response, and we return io.EOF.
// 5. For every line we read, we check if the context has been cancelled, and if so, return the error.
type frameReader struct {
orig io.ReadCloser
reader bufio.Reader
ctx context.Context
}
func newFrameReader(r io.ReadCloser, ctx context.Context) (*frameReader, error) {
br := bufio.NewReader(r)
err := validateJsonResponse(br)
if err != nil {
return nil, err
}
return &frameReader{orig: r, reader: *br, ctx: ctx}, nil
}
// validateJsonResponse reads the first byte of the response to determine if it is in fact valid JSON.
// Kusto may return an error message that is not JSON, and instead will just be a plain string with an error message.
// If the first byte is not '[', then we assume it is an error message and return an error.
func validateJsonResponse(br *bufio.Reader) error {
first, err := br.Peek(1)
if err != nil {
return err
}
if len(first) == 0 {
return errors.ES(errors.OpUnknown, errors.KInternal, "No data")
}
if first[0] != '[' {
all, err := io.ReadAll(br)
if err != nil {
return err
}
return errors.ES(errors.OpUnknown, errors.KInternal, "Got error: %v", string(all))
}
return nil
}
// advance reads the next frame from the response.
func (fr *frameReader) advance() ([]byte, error) {
// Check if the context has been cancelled, so we won't keep reading after the response is cancelled.
if fr.ctx.Err() != nil {
return nil, fr.ctx.Err()
}
// Read until the end of the current line, which is the entire frame.
line, err := fr.reader.ReadBytes('\n')
if err != nil {
return nil, err
}
// If the first character is ']', then we have reached the end of the response.
if len(line) > 0 && line[0] == ']' {
return nil, io.EOF
}
// Trim newline
line = line[:len(line)-1]
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
if len(line) < 2 {
return nil, errors.ES(errors.OpUnknown, errors.KInternal, "Got EOF while reading frame")
}
// We skip the first byte of the line, as it is a comma, or the start of the array.
if line[0] != '[' && line[0] != ',' {
return nil, errors.ES(errors.OpUnknown, errors.KInternal, "Expected comma or start array, got '%c'", line[0])
}
line = line[1:]
return line, nil
}
// Close closes the underlying reader.
func (fr *frameReader) close() error {
return fr.orig.Close()
}