azkustodata/query/v2/fast_json.go (224 lines of code) (raw):

package v2 import ( "bytes" "encoding/json" "github.com/Azure/azure-kusto-go/azkustodata/errors" "github.com/Azure/azure-kusto-go/azkustodata/query" "github.com/Azure/azure-kusto-go/azkustodata/types" "github.com/Azure/azure-kusto-go/azkustodata/value" "io" ) func newDecoder(r io.Reader) *json.Decoder { dec := json.NewDecoder(r) // This option uses the json.Number type for all numbers, instead of float64. // This allows us to parse numbers that are too large for a float64, like uint64 or decimal. dec.UseNumber() return dec } // UnmarshalJSON implements the json.Unmarshaler interface for TableFragment. // See decodeTableFragment for further explanation. func (t *TableFragment) UnmarshalJSON(b []byte) error { decoder := newDecoder(bytes.NewReader(b)) rows, err := decodeTableFragment(b, decoder, t.Columns, t.PreviousIndex) if err != nil { return err } t.Rows = rows return nil } // UnmarshalJSON implements the json.Unmarshaler interface for DataTable. // A DataTable is "just" a TableHeader and TableFragment, so we can reuse the existing functions. func (q *DataTable) UnmarshalJSON(b []byte) error { decoder := newDecoder(bytes.NewReader(b)) err := decodeHeader(decoder, &q.Header, DataTableFrameType) if err != nil { return err } rows, err := decodeTableFragment(b, decoder, q.Header.Columns, 0) if err != nil { return err } q.Rows = rows return nil } // UnmarshalJSON implements the json.Unmarshaler interface for DataSetHeader. // We need to decode this manually to set the correct Columns, in order to save on allocations later on. func (t *TableHeader) UnmarshalJSON(b []byte) error { decoder := newDecoder(bytes.NewReader(b)) err := decodeHeader(decoder, t, TableHeaderFrameType) if err != nil { return err } return nil } // decodeHeader decodes the header of a table, which is the same for TableHeader and DataTable. // It assumes the order of the properties in the JSON is fixed. func decodeHeader(decoder *json.Decoder, t *TableHeader, frameType FrameType) error { err := assertToken(decoder, json.Delim('{')) if err != nil { return err } err = assertStringProperty(decoder, "FrameType", string(frameType)) if err != nil { return err } t.TableId, err = getIntProperty(decoder, "TableId") if err != nil { return err } t.TableKind, err = getStringProperty(decoder, "TableKind") if err != nil { return err } t.TableName, err = getStringProperty(decoder, "TableName") if err != nil { return err } err = assertToken(decoder, json.Token("Columns")) if err != nil { return err } t.Columns, err = decodeColumns(decoder) if err != nil { return err } return nil } // decodeTableFragment decodes the common part of a TableFragment and DataTable - the rows. func decodeTableFragment(b []byte, decoder *json.Decoder, columns []query.Column, previousIndex int) ([]query.Row, error) { // skip properties until we reach the Rows property (guaranteed to be the last one) for { tok, err := decoder.Token() if err != nil { return nil, err } if tok == json.Token("Rows") { break } } rows, err := decodeRows(b, decoder, columns, previousIndex) if err != nil { return nil, err } return rows, nil } // decodeColumns decodes the columns of a table from the JSON. // Columns is an array of the form [ { "ColumnName": "name", "ColumnType": "type" }, ... ] // 1. We need to set the ColumnIndex, which is not present in the JSON // 2. We need to normalize the column type - in rare cases, kusto has type aliases like "date" instead of "datetime", and we need to normalize them // 3. We need to validate the column type - if it's not a valid type, we should return an error func decodeColumns(decoder *json.Decoder) ([]query.Column, error) { cols := make([]query.Column, 0) err := assertToken(decoder, json.Delim('[')) if err != nil { return nil, err } for i := 0; decoder.More(); i++ { col := FrameColumn{ ColumnIndex: i, } decoder.Decode(&col) // Normalize the column type - error is an empty string col.ColumnType = string(types.NormalizeColumn(col.ColumnType)) if col.ColumnType == "" { return nil, errors.ES(errors.OpTableAccess, errors.KClientArgs, "column[%d] is of type %s, which is not valid", i, col.ColumnType) } cols = append(cols, col) } if err := assertToken(decoder, json.Delim(']')); err != nil { return nil, err } return cols, nil } // decodeRows decodes the rows of a table from the JSON. // Rows is an array of the form [ [value1, value2, ...], ... ] // In V2 Fragmented, it's guaranteed that no errors will appear in the middle of the array, only at the end of the table. // This function: // 1. Creates a cached map of column names to columns for faster lookup // 2. Decodes the rows into a slice of query.Rows func decodeRows(b []byte, decoder *json.Decoder, cols []query.Column, startIndex int) ([]query.Row, error) { const RowArrayAllocSize = 10 var rows = make([]query.Row, 0, RowArrayAllocSize) columnsByName := make(map[string]query.Column, len(cols)) for _, c := range cols { columnsByName[c.Name()] = c } err := assertToken(decoder, json.Delim('[')) if err != nil { return nil, err } for i := startIndex; decoder.More(); i++ { rowValues, err := decodeRow(b, decoder, cols) if err != nil { return nil, err } row := query.NewRowFromParts(cols, func(name string) query.Column { return columnsByName[name] }, i, rowValues) rows = append(rows, row) } if err := assertToken(decoder, json.Delim(']')); err != nil { return nil, err } return rows, nil } // decodeRow decodes a single row from the JSON. // A row is an array of values of the types from kusto, as indicated by the columns. // For dynamic values, they can appear as nested arrays or objects, so we need to handle them. // Otherwise, we just unmarshal the value into the correct type. func decodeRow( buffer []byte, decoder *json.Decoder, cols []query.Column) (value.Values, error) { err := assertToken(decoder, json.Delim('[')) if err != nil { return nil, err } values := make([]value.Kusto, 0, len(cols)) field := 0 for ; decoder.More(); field++ { t, err := decoder.Token() if err != nil { return nil, err } // Handle nested values if t == json.Delim('[') || t == json.Delim('{') { t, err = decodeNestedValue(decoder, buffer) if err != nil { return nil, err } } // Create a new value of the correct type kustoValue := value.Default(cols[field].Type()) // Unmarshal the value err = kustoValue.Unmarshal(t) if err != nil { return nil, err } values = append(values, kustoValue) } err = assertToken(decoder, json.Delim(']')) if err != nil { return nil, err } return values, nil } // decodeNestedValue decodes a nested value from the JSON into a byte array inside a json.Token. // How it works: // 1. We need the original buffer to be able to extract the nested value from the offsets. // 2. We get the starting offset of the nested value. // 3. We get the next tokens, we ignore all of them unless they start a new nested value. // 4. If we find a nested value, we increase the nesting level, and decrease it when we find the closing token. // 5. At the end, we're guaranteed to be at the end of original the nested value. // 6. We get the final offset of the nested value. // 7. We return a json.Token that points to the entire byte range of the nested value. func decodeNestedValue(decoder *json.Decoder, buffer []byte) (json.Token, error) { nest := 1 initialOffset := decoder.InputOffset() - 1 for { for decoder.More() { t, err := decoder.Token() if err != nil { return nil, err } if t == json.Delim('[') || t == json.Delim('{') { nest++ } } t, err := decoder.Token() if err != nil { return nil, err } if t == json.Delim(']') || t == json.Delim('}') { nest-- } if nest == 0 { break } } finalOffset := decoder.InputOffset() return json.Token(buffer[initialOffset:finalOffset]), nil } // validateDataSetHeader makes sure the dataset header is valid for V2 Fragmented Query. func validateDataSetHeader(dec *json.Decoder) error { const HeaderVersion = "v2.0" const NotProgressive = false const IsFragmented = true const ErrorReportingEndOfTable = "EndOfTable" if err := assertToken(dec, json.Delim('{')); err != nil { return err } if err := assertStringProperty(dec, "FrameType", json.Token(string(DataSetHeaderFrameType))); err != nil { return err } if err := assertStringProperty(dec, "IsProgressive", json.Token(NotProgressive)); err != nil { return err } if err := assertStringProperty(dec, "Version", json.Token(HeaderVersion)); err != nil { return err } if err := assertStringProperty(dec, "IsFragmented", json.Token(IsFragmented)); err != nil { return err } if err := assertStringProperty(dec, "ErrorReportingPlacement", json.Token(ErrorReportingEndOfTable)); err != nil { return err } return nil }