in connect/api/src/main/java/org/apache/kafka/connect/data/Values.java [767:961]
protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException {
if (!parser.hasNext()) {
return null;
}
if (embedded) {
if (parser.canConsume(QUOTE_DELIMITER)) {
StringBuilder sb = new StringBuilder();
while (parser.hasNext()) {
if (parser.canConsume(QUOTE_DELIMITER)) {
break;
}
sb.append(parser.next());
}
String content = sb.toString();
// We can parse string literals as temporal logical types, but all others
// are treated as strings
SchemaAndValue parsed = parseString(content);
if (parsed != null && TEMPORAL_LOGICAL_TYPE_NAMES.contains(parsed.schema().name())) {
return parsed;
}
return new SchemaAndValue(Schema.STRING_SCHEMA, content);
}
}
if (canParseSingleTokenLiteral(parser, embedded, NULL_VALUE)) {
return null;
}
if (canParseSingleTokenLiteral(parser, embedded, TRUE_LITERAL)) {
return TRUE_SCHEMA_AND_VALUE;
}
if (canParseSingleTokenLiteral(parser, embedded, FALSE_LITERAL)) {
return FALSE_SCHEMA_AND_VALUE;
}
int startPosition = parser.mark();
try {
if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
List<Object> result = new ArrayList<>();
Schema elementSchema = null;
while (parser.hasNext()) {
if (parser.canConsume(ARRAY_END_DELIMITER)) {
Schema listSchema;
if (elementSchema != null) {
listSchema = SchemaBuilder.array(elementSchema).schema();
result = alignListEntriesWithSchema(listSchema, result);
} else {
// Every value is null
listSchema = SchemaBuilder.arrayOfNull().build();
}
return new SchemaAndValue(listSchema, result);
}
if (parser.canConsume(COMMA_DELIMITER)) {
throw new DataException("Unable to parse an empty array element: " + parser.original());
}
SchemaAndValue element = parse(parser, true);
elementSchema = commonSchemaFor(elementSchema, element);
result.add(element != null ? element.value() : null);
int currentPosition = parser.mark();
if (parser.canConsume(ARRAY_END_DELIMITER)) {
parser.rewindTo(currentPosition);
} else if (!parser.canConsume(COMMA_DELIMITER)) {
throw new DataException("Array elements missing '" + COMMA_DELIMITER + "' delimiter");
}
}
// Missing either a comma or an end delimiter
if (COMMA_DELIMITER.equals(parser.previous())) {
throw new DataException("Array is missing element after ',': " + parser.original());
}
throw new DataException("Array is missing terminating ']': " + parser.original());
}
if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
Map<Object, Object> result = new LinkedHashMap<>();
Schema keySchema = null;
Schema valueSchema = null;
while (parser.hasNext()) {
if (parser.canConsume(MAP_END_DELIMITER)) {
Schema mapSchema;
if (keySchema != null && valueSchema != null) {
mapSchema = SchemaBuilder.map(keySchema, valueSchema).build();
result = alignMapKeysAndValuesWithSchema(mapSchema, result);
} else if (keySchema != null) {
mapSchema = SchemaBuilder.mapWithNullValues(keySchema);
result = alignMapKeysWithSchema(mapSchema, result);
} else {
mapSchema = SchemaBuilder.mapOfNull().build();
}
return new SchemaAndValue(mapSchema, result);
}
if (parser.canConsume(COMMA_DELIMITER)) {
throw new DataException("Unable to parse a map entry with no key or value: " + parser.original());
}
SchemaAndValue key = parse(parser, true);
if (key == null || key.value() == null) {
throw new DataException("Map entry may not have a null key: " + parser.original());
}
if (!parser.canConsume(ENTRY_DELIMITER)) {
throw new DataException("Map entry is missing '" + ENTRY_DELIMITER
+ "' at " + parser.position()
+ " in " + parser.original());
}
SchemaAndValue value = parse(parser, true);
Object entryValue = value != null ? value.value() : null;
result.put(key.value(), entryValue);
parser.canConsume(COMMA_DELIMITER);
keySchema = commonSchemaFor(keySchema, key);
valueSchema = commonSchemaFor(valueSchema, value);
}
// Missing either a comma or an end delimiter
if (COMMA_DELIMITER.equals(parser.previous())) {
throw new DataException("Map is missing element after ',': " + parser.original());
}
throw new DataException("Map is missing terminating '}': " + parser.original());
}
} catch (DataException e) {
LOG.trace("Unable to parse the value as a map or an array; reverting to string", e);
parser.rewindTo(startPosition);
}
String token = parser.next();
if (token.trim().isEmpty()) {
return new SchemaAndValue(Schema.STRING_SCHEMA, token);
}
token = token.trim();
char firstChar = token.charAt(0);
boolean firstCharIsDigit = Character.isDigit(firstChar);
// Temporal types are more restrictive, so try them first
if (firstCharIsDigit) {
// The time and timestamp literals may be split into 5 tokens since an unescaped colon
// is a delimiter. Check these first since the first of these tokens is a simple numeric
int position = parser.mark();
String remainder = parser.next(4);
if (remainder != null) {
String timeOrTimestampStr = token + remainder;
SchemaAndValue temporal = parseAsTemporal(timeOrTimestampStr);
if (temporal != null) {
return temporal;
}
}
// No match was found using the 5 tokens, so rewind and see if the current token has a date, time, or timestamp
parser.rewindTo(position);
SchemaAndValue temporal = parseAsTemporal(token);
if (temporal != null) {
return temporal;
}
}
if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
try {
// Try to parse as a number ...
BigDecimal decimal = new BigDecimal(token);
try {
return new SchemaAndValue(Schema.INT8_SCHEMA, decimal.byteValueExact());
} catch (ArithmeticException e) {
// continue
}
try {
return new SchemaAndValue(Schema.INT16_SCHEMA, decimal.shortValueExact());
} catch (ArithmeticException e) {
// continue
}
try {
return new SchemaAndValue(Schema.INT32_SCHEMA, decimal.intValueExact());
} catch (ArithmeticException e) {
// continue
}
try {
return new SchemaAndValue(Schema.INT64_SCHEMA, decimal.longValueExact());
} catch (ArithmeticException e) {
// continue
}
double dValue = decimal.doubleValue();
if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) {
return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
}
Schema schema = Decimal.schema(decimal.scale());
return new SchemaAndValue(schema, decimal);
} catch (NumberFormatException e) {
// can't parse as a number
}
}
if (embedded) {
throw new DataException("Failed to parse embedded value");
}
// At this point, the only thing this non-embedded value can be is a string.
return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
}