src/parse.c (678 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "qpid/dispatch/parse.h" #include "qpid/dispatch/alloc.h" #include "qpid/dispatch/amqp.h" #include "qpid/dispatch/ctools.h" #include "buffer_field_api.h" #include "buffer_field_api.h" #include <assert.h> #include <inttypes.h> #include <stdio.h> DEQ_DECLARE(qd_parsed_field_t, qd_parsed_field_list_t); typedef struct qd_amqp_field_t { uint8_t tag; uint32_t size; // includes length of count! uint32_t count; qd_buffer_field_t value; // the raw (encoded) value } qd_amqp_field_t; struct qd_parsed_field_t { DEQ_LINKS(qd_parsed_field_t); const qd_parsed_field_t *parent; qd_parsed_field_list_t children; qd_iterator_t *typed_iter; // iterator over the full field (header and value) qd_iterator_t *raw_iter; // iterator over just the value const char *parse_error; qd_buffer_field_t full_field; // contains encoded AMQP type header and value qd_amqp_field_t amqp; // decoded header and raw value }; ALLOC_DECLARE(qd_parsed_field_t); ALLOC_DEFINE(qd_parsed_field_t); qd_parsed_field_t* qd_field_first_child(qd_parsed_field_t *field) { return DEQ_HEAD(field->children); } qd_parsed_field_t* qd_field_next_child(qd_parsed_field_t *field) { return DEQ_NEXT(field); } // length of size and count of AMQP data fields can be determined by the value // of the top 4 bits of the tag octet. See AMQP 1.0 Part 1 Types. // static inline int tag_get_size_length(uint8_t tag) { tag &= 0xF0; if (tag < 0xA0) return 0; if ((tag & 0x10) == 0) return 1; return 4; } static inline int tag_get_count_length(uint8_t tag) { tag &= 0xF0; if (tag < 0xC0) return 0; if ((tag & 0x10) == 0) return 1; return 4; } /** * Extract an AMQP value from the encoded data held in *bfield and store it in *value. * bfield is expected to point to the tag octet and will be advanced past the decoded value. * Returns 0 on success, else an error message. */ static inline char *parse_amqp_field(qd_buffer_field_t *bfield, qd_amqp_field_t *value) { ZERO(value); if (!qd_buffer_field_octet(bfield, &value->tag)) return "Insufficient Data to Determine Tag"; uint32_t length_of_count = tag_get_count_length(value->tag); uint32_t length_of_size = tag_get_size_length(value->tag); // extract size and content (optional) switch (value->tag & 0xF0) { case 0x40: break; case 0x50: value->size = 1; break; case 0x60: value->size = 2; break; case 0x70: value->size = 4; break; case 0x80: value->size = 8; break; case 0x90: value->size = 16; break; case 0xB0: case 0xD0: case 0xF0: { (void) length_of_size; // ignore unused var error assert(length_of_size == 4); if (!qd_buffer_field_uint32(bfield, &value->size)) { return "Insufficient Data to Determine Length"; } if (length_of_count) { assert(length_of_count == 4); if (!qd_buffer_field_uint32(bfield, &value->count)) { return "Insufficient Data to Determine Count"; } } } break; case 0xA0: case 0xC0: case 0xE0: { uint8_t octet; assert(length_of_size == 1); if (!qd_buffer_field_octet(bfield, &octet)) { return "Insufficient Data to Determine Length"; } value->size = octet; if (length_of_count) { assert(length_of_count == 1); if (!qd_buffer_field_octet(bfield, &octet)) { return "Insufficient Data to Determine Count"; } value->count = octet; } break; } default: return "Invalid Tag - No Length Information"; } if ((value->tag == QD_AMQP_MAP8 || value->tag == QD_AMQP_MAP32) && (value->count & 1)) return "Odd Number of Elements in a Map"; if (length_of_count > value->size) return "Insufficient Length to Determine Count"; value->value = *bfield; value->value.remaining = value->size - length_of_count; size_t moved = qd_buffer_field_advance(bfield, value->value.remaining); if (moved != value->value.remaining) return "Truncated field"; return 0; } // bfield contains the encoded AMQP data to be parsed. bfield starts at the // type tag octet and should be long enough to hold the entire AMQP data type. // On return bfield has been advanced past the encoded AMQP data. // static qd_parsed_field_t *qd_parse_internal(qd_buffer_field_t *bfield, qd_parsed_field_t *p) { qd_parsed_field_t *field = new_qd_parsed_field_t(); if (!field) return 0; ZERO(field); DEQ_ITEM_INIT(field); DEQ_INIT(field->children); field->parent = p; field->full_field = *bfield; field->parse_error = parse_amqp_field(bfield, &field->amqp); if (!field->parse_error) { // truncate full_field in case bfield holds multiple values. // since bfield has advanced past the parsed field we just subtract it. field->full_field.remaining -= bfield->remaining; // now parse out the content of any contained types: qd_buffer_field_t children = field->amqp.value; for (uint32_t idx = 0; idx < field->amqp.count; idx++) { qd_parsed_field_t *child = qd_parse_internal(&children, field); DEQ_INSERT_TAIL(field->children, child); if (!qd_parse_ok(child)) { field->parse_error = child->parse_error; break; } } } return field; } qd_parsed_field_t *qd_parse(const qd_iterator_t *iter) { if (!iter) return 0; qd_buffer_field_t bfield = qd_iterator_get_view_cursor(iter); return qd_parse_internal(&bfield, 0); } void qd_parse_free(qd_parsed_field_t *field) { if (!field) return; assert(field->parent == 0); if (field->raw_iter) qd_iterator_free(field->raw_iter); if (field->typed_iter) qd_iterator_free(field->typed_iter); qd_parsed_field_t *sub_field = DEQ_HEAD(field->children); while (sub_field) { qd_parsed_field_t *next = DEQ_NEXT(sub_field); DEQ_REMOVE_HEAD(field->children); sub_field->parent = 0; qd_parse_free(sub_field); sub_field = next; } free_qd_parsed_field_t(field); } static qd_parsed_field_t *qd_parse_dup_internal(const qd_parsed_field_t *field, const qd_parsed_field_t *parent) { qd_parsed_field_t *dup = new_qd_parsed_field_t(); if (dup == 0) return 0; ZERO(dup); dup->parent = parent; dup->raw_iter = qd_iterator_dup(field->raw_iter); dup->typed_iter = qd_iterator_dup(field->typed_iter); dup->amqp = field->amqp; dup->full_field = field->full_field; qd_parsed_field_t *child = DEQ_HEAD(field->children); while (child) { qd_parsed_field_t *dup_child = qd_parse_dup_internal(child, field); DEQ_INSERT_TAIL(dup->children, dup_child); child = DEQ_NEXT(child); } return dup; } qd_parsed_field_t *qd_parse_dup(const qd_parsed_field_t *field) { return field ? qd_parse_dup_internal(field, 0) : 0; } int qd_parse_ok(qd_parsed_field_t *field) { return field && field->parse_error == 0; } const char *qd_parse_error(qd_parsed_field_t *field) { return field ? field->parse_error : "No field"; } uint8_t qd_parse_tag(qd_parsed_field_t *field) { assert(field); return field->amqp.tag; } // just the data (no header/tag) qd_iterator_t *qd_parse_raw(qd_parsed_field_t *field) { if (!field) return 0; if (!field->raw_iter) { field->raw_iter = qd_iterator_buffer_field(&field->amqp.value, ITER_VIEW_ALL); } return field->raw_iter; } // includes type header, tag and data qd_iterator_t *qd_parse_typed(qd_parsed_field_t *field) { if (!field) return 0; if (!field->typed_iter) { field->typed_iter = qd_iterator_buffer_field(&field->full_field, ITER_VIEW_ALL); } return field->typed_iter; } qd_buffer_field_t qd_parse_value(const qd_parsed_field_t *field) { assert(field && !field->parse_error); return field->amqp.value; } uint32_t qd_parse_as_uint(qd_parsed_field_t *field) { uint32_t result = 0; uint64_t tmp = qd_parse_as_ulong(field); if (qd_parse_ok(field)) { if (tmp <= UINT32_MAX) { result = tmp; } else { field->parse_error = "Integer value too large to parse as uint"; } } return result; } uint64_t qd_parse_as_ulong(qd_parsed_field_t *parsed_field) { uint64_t result = 0; uint32_t tmp32 = 0; uint8_t octet = 0; qd_buffer_field_t field = parsed_field->amqp.value; switch (parsed_field->amqp.tag) { case QD_AMQP_ULONG: case QD_AMQP_TIMESTAMP: qd_buffer_field_uint32(&field, &tmp32); result = ((uint64_t) tmp32) << 32; qd_buffer_field_uint32(&field, &tmp32); result |= ((uint64_t) tmp32); break; case QD_AMQP_UINT: qd_buffer_field_uint32(&field, &tmp32); result = tmp32; break; case QD_AMQP_USHORT: qd_buffer_field_octet(&field, &octet); result = ((uint64_t) octet) << 8; // Fall Through... case QD_AMQP_BOOLEAN: case QD_AMQP_UBYTE: case QD_AMQP_SMALLUINT: case QD_AMQP_SMALLULONG: qd_buffer_field_octet(&field, &octet); result |= (uint64_t) octet; break; case QD_AMQP_TRUE: result = 1; break; case QD_AMQP_FALSE: case QD_AMQP_UINT0: case QD_AMQP_ULONG0: // already zeroed break; case QD_AMQP_STR8_UTF8: case QD_AMQP_STR32_UTF8: case QD_AMQP_SYM8: case QD_AMQP_SYM32: { // conversion from string to 64 bit unsigned integer: char *value = qd_buffer_field_strdup(&field); if (sscanf(value, "%"SCNu64, &result) != 1) parsed_field->parse_error = "Cannot convert string to unsigned long"; free(value); } break; case QD_AMQP_BYTE: case QD_AMQP_SHORT: case QD_AMQP_INT: case QD_AMQP_SMALLINT: case QD_AMQP_LONG: case QD_AMQP_SMALLLONG: { // if a signed integer is positive, accept it int64_t ltmp = qd_parse_as_long(parsed_field); if (qd_parse_ok(parsed_field)) { if (ltmp >= 0) { result = (uint64_t)ltmp; } else { parsed_field->parse_error = "Unable to parse negative integer as unsigned"; } } } break; default: parsed_field->parse_error = "Unable to parse as an unsigned integer"; // catch any missing types during development assert(false); } return result; } int32_t qd_parse_as_int(qd_parsed_field_t *field) { int32_t result = 0; int64_t tmp = qd_parse_as_long(field); if (qd_parse_ok(field)) { if (INT32_MIN <= tmp && tmp <= INT32_MAX) { result = tmp; } else { field->parse_error = "Integer value too large to parse as int"; } } return result; } int64_t qd_parse_as_long(qd_parsed_field_t *parsed_field) { int64_t result = 0; qd_buffer_field_t field = parsed_field->amqp.value; switch (parsed_field->amqp.tag) { case QD_AMQP_LONG: { uint64_t convert; uint32_t tmp32 = 0; qd_buffer_field_uint32(&field, &tmp32); convert = ((uint64_t) tmp32) << 32; qd_buffer_field_uint32(&field, &tmp32); convert |= (uint64_t) tmp32; result = (int64_t) convert; break; } case QD_AMQP_INT: { uint32_t tmp = 0; qd_buffer_field_uint32(&field, &tmp); result = (int32_t) tmp; break; } case QD_AMQP_SHORT: { uint16_t convert; uint8_t octet = 0; qd_buffer_field_octet(&field, &octet); convert = ((uint16_t) octet) << 8; qd_buffer_field_octet(&field, &octet); convert |= ((uint16_t) octet); result = (int16_t) convert; break; } case QD_AMQP_BYTE: case QD_AMQP_BOOLEAN: case QD_AMQP_SMALLLONG: case QD_AMQP_SMALLINT: { uint8_t octet = 0; qd_buffer_field_octet(&field, &octet); result = (int8_t) octet; break; } case QD_AMQP_TRUE: result = 1; break; case QD_AMQP_FALSE: case QD_AMQP_UINT0: case QD_AMQP_ULONG0: // already zeroed break; case QD_AMQP_STR8_UTF8: case QD_AMQP_STR32_UTF8: case QD_AMQP_SYM8: case QD_AMQP_SYM32: { // conversion from string to 64 bit integer: char *value = qd_buffer_field_strdup(&field); if (sscanf(value, "%"SCNi64, &result) != 1) parsed_field->parse_error = "Cannot convert string to long"; free(value); } break; case QD_AMQP_UBYTE: case QD_AMQP_SMALLUINT: case QD_AMQP_SMALLULONG: case QD_AMQP_USHORT: case QD_AMQP_UINT: case QD_AMQP_ULONG: { // if an unsigned integer "fits" accept it uint64_t utmp = qd_parse_as_ulong(parsed_field); if (qd_parse_ok(parsed_field)) { uint64_t max = INT8_MAX; switch (parsed_field->amqp.tag) { case QD_AMQP_USHORT: max = INT16_MAX; break; case QD_AMQP_UINT: max = INT32_MAX; break; case QD_AMQP_ULONG: max = INT64_MAX; break; } if (utmp <= max) { result = (int64_t)utmp; } else { parsed_field->parse_error = "Unable to parse unsigned integer as a signed integer"; } } } break; default: parsed_field->parse_error = "Unable to parse as a signed integer"; // catch any missing types during development assert(false); } return result; } bool qd_parse_as_bool(qd_parsed_field_t *parsed_field) { bool result = false; qd_buffer_field_t field = parsed_field->amqp.value; switch (parsed_field->amqp.tag) { case QD_AMQP_BYTE: case QD_AMQP_BOOLEAN: { uint8_t octet = 0; qd_buffer_field_octet(&field, &octet); result = !!octet; break; } case QD_AMQP_TRUE: result = true; break; } return result; } char *qd_parse_as_string(const qd_parsed_field_t *parsed_field) { char *str = 0; switch (parsed_field->amqp.tag) { case QD_AMQP_STR8_UTF8: case QD_AMQP_SYM8: case QD_AMQP_STR32_UTF8: case QD_AMQP_SYM32: { qd_buffer_field_t tmp = parsed_field->amqp.value; str = qd_buffer_field_strdup(&tmp); break; } default: break; } return str; } uint32_t qd_parse_sub_count(qd_parsed_field_t *field) { uint32_t count = DEQ_SIZE(field->children); if (field->amqp.tag == QD_AMQP_MAP8 || field->amqp.tag == QD_AMQP_MAP32) count = count >> 1; return count; } qd_parsed_field_t *qd_parse_sub_key(qd_parsed_field_t *field, uint32_t idx) { if (field->amqp.tag != QD_AMQP_MAP8 && field->amqp.tag != QD_AMQP_MAP32) return 0; idx = idx << 1; qd_parsed_field_t *key = DEQ_HEAD(field->children); while (idx && key) { idx--; key = DEQ_NEXT(key); } return key; } qd_parsed_field_t *qd_parse_sub_value(qd_parsed_field_t *field, uint32_t idx) { if (field->amqp.tag == QD_AMQP_MAP8 || field->amqp.tag == QD_AMQP_MAP32) idx = (idx << 1) + 1; qd_parsed_field_t *key = DEQ_HEAD(field->children); while (idx && key) { idx--; key = DEQ_NEXT(key); } return key; } int is_tag_a_map(uint8_t tag) { return tag == QD_AMQP_MAP8 || tag == QD_AMQP_MAP32; } int qd_parse_is_map(qd_parsed_field_t *field) { if (!field) return 0; return is_tag_a_map(field->amqp.tag); } int qd_parse_is_list(qd_parsed_field_t *field) { if (!field) return 0; return field->amqp.tag == QD_AMQP_LIST8 || field->amqp.tag == QD_AMQP_LIST32 || field->amqp.tag == QD_AMQP_LIST0; } int qd_parse_is_scalar(qd_parsed_field_t *field) { return DEQ_SIZE(field->children) == 0; } static inline bool qd_parse_is_string(const qd_parsed_field_t *field) { return field->amqp.tag == QD_AMQP_STR8_UTF8 || field->amqp.tag == QD_AMQP_STR32_UTF8; } qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *key) { if (!key) return 0; uint32_t count = qd_parse_sub_count(field); for (uint32_t idx = 0; idx < count; idx++) { qd_parsed_field_t *sub = qd_parse_sub_key(field, idx); if (!sub) return 0; qd_buffer_field_t value = sub->amqp.value; size_t len = strlen(key); if (qd_buffer_field_equal(&value, (const uint8_t*) key, len)) { return qd_parse_sub_value(field, idx); } } return 0; } const char *qd_parse_annotations( bool strip_annotations_in, qd_iterator_t *ma_iter_in, qd_parsed_field_t **ma_ingress, qd_parsed_field_t **ma_phase, qd_parsed_field_t **ma_to_override, qd_parsed_field_t **ma_trace, qd_parsed_field_t **ma_stream, qd_buffer_field_t *user_annotations, uint32_t *user_count) { *ma_ingress = 0; *ma_phase = 0; *ma_to_override = 0; *ma_trace = 0; ZERO(user_annotations); *user_count = 0; if (!ma_iter_in) return 0; // ok - MA not present const char *error = 0; qd_buffer_field_t bfield = qd_iterator_get_view_cursor(ma_iter_in); qd_amqp_field_t ma_map; error = parse_amqp_field(&bfield, &ma_map); if (error) return error; if (ma_map.tag != QD_AMQP_MAP8 && ma_map.tag != QD_AMQP_MAP32) return "Invalid message annotations section - missing map type"; if (ma_map.count & 0x01) return "Invalid MA map count (odd number of fields)"; if (ma_map.count == 0) return 0; // empty map, ignore // ma_map.value now holds all of the key/value fields in the map and points // to the first key/value pair. The router-specific map entries always // come after any user-supplied MA data. Snapshot the current location for // the start of user data user_annotations->buffer = ma_map.value.buffer; user_annotations->cursor = ma_map.value.cursor; bool user_anno = true; // assume first annotations are non-router size_t user_annos_size = 0; uint32_t user_annos_count = 0; // Now iterate over each key looking for router-specific map entires qd_buffer_field_t ma_fields = ma_map.value; int kv_count = ma_map.count / 2; // pairs of key,value fields while (kv_count--) { qd_amqp_field_t key; // extract key, advance ma_fields to the value field error = parse_amqp_field(&ma_fields, &key); if (error) return error; if (key.tag == QD_AMQP_SYM8 || key.tag == QD_AMQP_SYM32) { switch (key.value.remaining) { case QD_MA_PREFIX_LEN: if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_PREFIX, QD_MA_PREFIX_LEN)) { qd_amqp_field_t skip; user_anno = false; // empty router annotation - ignore it error = parse_amqp_field(&ma_fields, &skip); if (error) return error; } break; case QD_MA_TO_LEN: if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_TO, QD_MA_TO_LEN)) { user_anno = false; if (!strip_annotations_in) { (*ma_to_override) = qd_parse_internal(&ma_fields, 0); if (!qd_parse_ok((*ma_to_override))) return (*ma_to_override)->parse_error; if (!qd_parse_is_string(*ma_to_override)) return "to-override not a valid string type"; } } break; case QD_MA_TRACE_LEN: // Same length as QD_MA_PHASE_LEN and QD_MA_CLASS_LEN: assert(QD_MA_TRACE_LEN == QD_MA_PHASE_LEN); assert(QD_MA_PHASE_LEN == QD_MA_CLASS_LEN); if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_TRACE, QD_MA_TRACE_LEN)) { user_anno = false; if (!strip_annotations_in) { (*ma_trace) = qd_parse_internal(&ma_fields, 0); if (!qd_parse_ok((*ma_trace))) return (*ma_trace)->parse_error; if (!qd_parse_is_list((*ma_trace))) return "trace annotation is not a list"; bool all_str = true; for (qd_parsed_field_t *node = DEQ_HEAD((*ma_trace)->children); node && all_str; node = DEQ_NEXT(node)) { all_str = qd_parse_is_string(node); } if (!all_str) return "trace list contains non-string entries"; } } else if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_PHASE, QD_MA_PHASE_LEN)) { user_anno = false; // always encoded as an int, may be small: if (!strip_annotations_in) { (*ma_phase) = qd_parse_internal(&ma_fields, 0); if (!qd_parse_ok((*ma_phase))) return (*ma_phase)->parse_error; } } else if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_CLASS, QD_MA_CLASS_LEN)) { // no longer used - skip it qd_amqp_field_t skip; user_anno = false; error = parse_amqp_field(&ma_fields, &skip); if (error) return error; } break; case QD_MA_STREAM_LEN: if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_STREAM, QD_MA_STREAM_LEN)) { user_anno = false; if (!strip_annotations_in) { (*ma_stream) = qd_parse_internal(&ma_fields, 0); if (!qd_parse_ok((*ma_stream))) return (*ma_stream)->parse_error; } } break; case QD_MA_INGRESS_LEN: if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_INGRESS, QD_MA_INGRESS_LEN)) { user_anno = false; if (!strip_annotations_in) { (*ma_ingress) = qd_parse_internal(&ma_fields, 0); if (!qd_parse_ok((*ma_ingress))) return (*ma_ingress)->parse_error; if (!qd_parse_is_string(*ma_ingress)) return "ingress router not a string type"; } } break; default: // user value break; } } if (user_anno) { qd_amqp_field_t user; // move past the value: error = parse_amqp_field(&ma_fields, &user); if (error) return error; size_t key_len = 1 + tag_get_size_length(key.tag) + key.size; size_t value_len = 1 + tag_get_size_length(user.tag) + user.size; user_annos_size += key_len + value_len; user_annos_count += 2; } else if (strip_annotations_in) { // hit the first non-user key - stop break; } } user_annotations->remaining = user_annos_size; *user_count = user_annos_count; return 0; }