protected static SchemaAndValue parse()

in connect/api/src/main/java/org/apache/kafka/connect/data/Values.java [767:961]


    protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException {
        if (!parser.hasNext()) {
            return null;
        }
        if (embedded) {
            if (parser.canConsume(QUOTE_DELIMITER)) {
                StringBuilder sb = new StringBuilder();
                while (parser.hasNext()) {
                    if (parser.canConsume(QUOTE_DELIMITER)) {
                        break;
                    }
                    sb.append(parser.next());
                }
                String content = sb.toString();
                // We can parse string literals as temporal logical types, but all others
                // are treated as strings
                SchemaAndValue parsed = parseString(content);
                if (parsed != null && TEMPORAL_LOGICAL_TYPE_NAMES.contains(parsed.schema().name())) {
                    return parsed;
                }
                return new SchemaAndValue(Schema.STRING_SCHEMA, content);
            }
        }

        if (canParseSingleTokenLiteral(parser, embedded, NULL_VALUE)) {
            return null;
        }
        if (canParseSingleTokenLiteral(parser, embedded, TRUE_LITERAL)) {
            return TRUE_SCHEMA_AND_VALUE;
        }
        if (canParseSingleTokenLiteral(parser, embedded, FALSE_LITERAL)) {
            return FALSE_SCHEMA_AND_VALUE;
        }

        int startPosition = parser.mark();

        try {
            if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
                List<Object> result = new ArrayList<>();
                Schema elementSchema = null;
                while (parser.hasNext()) {
                    if (parser.canConsume(ARRAY_END_DELIMITER)) {
                        Schema listSchema;
                        if (elementSchema != null) {
                            listSchema = SchemaBuilder.array(elementSchema).schema();
                            result = alignListEntriesWithSchema(listSchema, result);
                        } else {
                            // Every value is null
                            listSchema = SchemaBuilder.arrayOfNull().build();
                        }
                        return new SchemaAndValue(listSchema, result);
                    }

                    if (parser.canConsume(COMMA_DELIMITER)) {
                        throw new DataException("Unable to parse an empty array element: " + parser.original());
                    }
                    SchemaAndValue element = parse(parser, true);
                    elementSchema = commonSchemaFor(elementSchema, element);
                    result.add(element != null ? element.value() : null);

                    int currentPosition = parser.mark();
                    if (parser.canConsume(ARRAY_END_DELIMITER)) {
                        parser.rewindTo(currentPosition);
                    } else if (!parser.canConsume(COMMA_DELIMITER)) {
                        throw new DataException("Array elements missing '" + COMMA_DELIMITER + "' delimiter");
                    }
                }

                // Missing either a comma or an end delimiter
                if (COMMA_DELIMITER.equals(parser.previous())) {
                    throw new DataException("Array is missing element after ',': " + parser.original());
                }
                throw new DataException("Array is missing terminating ']': " + parser.original());
            }

            if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
                Map<Object, Object> result = new LinkedHashMap<>();
                Schema keySchema = null;
                Schema valueSchema = null;
                while (parser.hasNext()) {
                    if (parser.canConsume(MAP_END_DELIMITER)) {
                        Schema mapSchema;
                        if (keySchema != null && valueSchema != null) {
                            mapSchema = SchemaBuilder.map(keySchema, valueSchema).build();
                            result = alignMapKeysAndValuesWithSchema(mapSchema, result);
                        } else if (keySchema != null) {
                            mapSchema = SchemaBuilder.mapWithNullValues(keySchema);
                            result = alignMapKeysWithSchema(mapSchema, result);
                        } else {
                            mapSchema = SchemaBuilder.mapOfNull().build();
                        }
                        return new SchemaAndValue(mapSchema, result);
                    }

                    if (parser.canConsume(COMMA_DELIMITER)) {
                        throw new DataException("Unable to parse a map entry with no key or value: " + parser.original());
                    }
                    SchemaAndValue key = parse(parser, true);
                    if (key == null || key.value() == null) {
                        throw new DataException("Map entry may not have a null key: " + parser.original());
                    }

                    if (!parser.canConsume(ENTRY_DELIMITER)) {
                        throw new DataException("Map entry is missing '" + ENTRY_DELIMITER
                                                + "' at " + parser.position()
                                                + " in " + parser.original());
                    }
                    SchemaAndValue value = parse(parser, true);
                    Object entryValue = value != null ? value.value() : null;
                    result.put(key.value(), entryValue);

                    parser.canConsume(COMMA_DELIMITER);
                    keySchema = commonSchemaFor(keySchema, key);
                    valueSchema = commonSchemaFor(valueSchema, value);
                }
                // Missing either a comma or an end delimiter
                if (COMMA_DELIMITER.equals(parser.previous())) {
                    throw new DataException("Map is missing element after ',': " + parser.original());
                }
                throw new DataException("Map is missing terminating '}': " + parser.original());
            }
        } catch (DataException e) {
            LOG.trace("Unable to parse the value as a map or an array; reverting to string", e);
            parser.rewindTo(startPosition);
        }

        String token = parser.next();
        if (token.trim().isEmpty()) {
            return new SchemaAndValue(Schema.STRING_SCHEMA, token);
        }
        token = token.trim();

        char firstChar = token.charAt(0);
        boolean firstCharIsDigit = Character.isDigit(firstChar);

        // Temporal types are more restrictive, so try them first
        if (firstCharIsDigit) {
            // The time and timestamp literals may be split into 5 tokens since an unescaped colon
            // is a delimiter. Check these first since the first of these tokens is a simple numeric
            int position = parser.mark();
            String remainder = parser.next(4);
            if (remainder != null) {
                String timeOrTimestampStr = token + remainder;
                SchemaAndValue temporal = parseAsTemporal(timeOrTimestampStr);
                if (temporal != null) {
                    return temporal;
                }
            }
            // No match was found using the 5 tokens, so rewind and see if the current token has a date, time, or timestamp
            parser.rewindTo(position);
            SchemaAndValue temporal = parseAsTemporal(token);
            if (temporal != null) {
                return temporal;
            }
        }
        if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
            try {
                // Try to parse as a number ...
                BigDecimal decimal = new BigDecimal(token);
                try {
                    return new SchemaAndValue(Schema.INT8_SCHEMA, decimal.byteValueExact());
                } catch (ArithmeticException e) {
                    // continue
                }
                try {
                    return new SchemaAndValue(Schema.INT16_SCHEMA, decimal.shortValueExact());
                } catch (ArithmeticException e) {
                    // continue
                }
                try {
                    return new SchemaAndValue(Schema.INT32_SCHEMA, decimal.intValueExact());
                } catch (ArithmeticException e) {
                    // continue
                }
                try {
                    return new SchemaAndValue(Schema.INT64_SCHEMA, decimal.longValueExact());
                } catch (ArithmeticException e) {
                    // continue
                }
                double dValue = decimal.doubleValue();
                if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) {
                    return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
                }
                Schema schema = Decimal.schema(decimal.scale());
                return new SchemaAndValue(schema, decimal);
            } catch (NumberFormatException e) {
                // can't parse as a number
            }
        }
        if (embedded) {
            throw new DataException("Failed to parse embedded value");
        }
        // At this point, the only thing this non-embedded value can be is a string.
        return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
    }