in connect/api/src/main/java/org/apache/kafka/connect/data/Values.java [720:867]
protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException {
if (!parser.hasNext()) {
return null;
}
if (embedded) {
if (parser.canConsume(NULL_VALUE)) {
return null;
}
if (parser.canConsume(QUOTE_DELIMITER)) {
StringBuilder sb = new StringBuilder();
while (parser.hasNext()) {
if (parser.canConsume(QUOTE_DELIMITER)) {
break;
}
sb.append(parser.next());
}
return new SchemaAndValue(Schema.STRING_SCHEMA, sb.toString());
}
}
if (parser.canConsume(TRUE_LITERAL)) {
return TRUE_SCHEMA_AND_VALUE;
}
if (parser.canConsume(FALSE_LITERAL)) {
return FALSE_SCHEMA_AND_VALUE;
}
int startPosition = parser.mark();
try {
if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
List<Object> result = new ArrayList<>();
Schema elementSchema = null;
while (parser.hasNext()) {
if (parser.canConsume(ARRAY_END_DELIMITER)) {
Schema listSchema = elementSchema == null ? null : SchemaBuilder.array(elementSchema).schema();
result = alignListEntriesWithSchema(listSchema, result);
return new SchemaAndValue(listSchema, result);
}
SchemaAndValue element = parse(parser, true);
elementSchema = commonSchemaFor(elementSchema, element);
result.add(element.value());
parser.canConsume(COMMA_DELIMITER);
}
// Missing either a comma or an end delimiter
if (COMMA_DELIMITER.equals(parser.previous())) {
throw new DataException("Malformed array: missing element after ','");
}
throw new DataException("Malformed array: missing terminating ']'");
}
if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
Map<Object, Object> result = new LinkedHashMap<>();
Schema keySchema = null;
Schema valueSchema = null;
while (parser.hasNext()) {
if (parser.canConsume(MAP_END_DELIMITER)) {
Schema mapSchema =
keySchema == null || valueSchema == null ? null : SchemaBuilder.map(keySchema, valueSchema).schema();
result = alignMapKeysAndValuesWithSchema(mapSchema, result);
return new SchemaAndValue(mapSchema, result);
}
SchemaAndValue key = parse(parser, true);
if (key == null || key.value() == null) {
throw new DataException("Malformed map entry: null key");
}
if (!parser.canConsume(ENTRY_DELIMITER)) {
throw new DataException("Malformed map entry: missing '='");
}
SchemaAndValue value = parse(parser, true);
Object entryValue = value != null ? value.value() : null;
result.put(key.value(), entryValue);
parser.canConsume(COMMA_DELIMITER);
keySchema = commonSchemaFor(keySchema, key);
valueSchema = commonSchemaFor(valueSchema, value);
}
// Missing either a comma or an end delimiter
if (COMMA_DELIMITER.equals(parser.previous())) {
throw new DataException("Malformed map: missing element after ','");
}
throw new DataException("Malformed array: missing terminating ']'");
}
} catch (DataException e) {
LOG.debug("Unable to parse the value as a map; reverting to string", e);
parser.rewindTo(startPosition);
}
String token = parser.next().trim();
assert !token.isEmpty(); // original can be empty string but is handled right away; no way for token to be empty here
char firstChar = token.charAt(0);
boolean firstCharIsDigit = Character.isDigit(firstChar);
if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
try {
// Try to parse as a number ...
BigDecimal decimal = new BigDecimal(token);
try {
return new SchemaAndValue(Schema.INT8_SCHEMA, decimal.byteValueExact());
} catch (ArithmeticException e) {
// continue
}
try {
return new SchemaAndValue(Schema.INT16_SCHEMA, decimal.shortValueExact());
} catch (ArithmeticException e) {
// continue
}
try {
return new SchemaAndValue(Schema.INT32_SCHEMA, decimal.intValueExact());
} catch (ArithmeticException e) {
// continue
}
try {
return new SchemaAndValue(Schema.INT64_SCHEMA, decimal.longValueExact());
} catch (ArithmeticException e) {
// continue
}
double dValue = decimal.doubleValue();
if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) {
return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
}
Schema schema = Decimal.schema(decimal.scale());
return new SchemaAndValue(schema, decimal);
} catch (NumberFormatException e) {
// can't parse as a number
}
}
if (firstCharIsDigit) {
// Check for a date, time, or timestamp ...
int tokenLength = token.length();
if (tokenLength == ISO_8601_DATE_LENGTH) {
try {
return new SchemaAndValue(Date.SCHEMA, new SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token));
} catch (ParseException e) {
// not a valid date
}
} else if (tokenLength == ISO_8601_TIME_LENGTH) {
try {
return new SchemaAndValue(Time.SCHEMA, new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token));
} catch (ParseException e) {
// not a valid date
}
} else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) {
try {
return new SchemaAndValue(Time.SCHEMA, new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token));
} catch (ParseException e) {
// not a valid date
}
}
}
// At this point, the only thing this can be is a string. Embedded strings were processed above,
// so this is not embedded and we can use the original string...
return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
}