in sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java [349:505]
private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType protoFieldType) {
switch (protoFieldType.getTypeInfoCase()) {
case ATOMIC_TYPE:
switch (protoFieldType.getAtomicType()) {
case BYTE:
return FieldType.of(TypeName.BYTE);
case INT16:
return FieldType.of(TypeName.INT16);
case INT32:
return FieldType.of(TypeName.INT32);
case INT64:
return FieldType.of(TypeName.INT64);
case FLOAT:
return FieldType.of(TypeName.FLOAT);
case DOUBLE:
return FieldType.of(TypeName.DOUBLE);
case STRING:
return FieldType.of(TypeName.STRING);
case BOOLEAN:
return FieldType.of(TypeName.BOOLEAN);
case BYTES:
return FieldType.of(TypeName.BYTES);
case UNSPECIFIED:
throw new IllegalArgumentException("Encountered UNSPECIFIED AtomicType");
default:
throw new IllegalArgumentException(
"Encountered unknown AtomicType: " + protoFieldType.getAtomicType());
}
case ROW_TYPE:
return FieldType.row(schemaFromProto(protoFieldType.getRowType().getSchema()));
case ARRAY_TYPE:
return FieldType.array(fieldTypeFromProto(protoFieldType.getArrayType().getElementType()));
case ITERABLE_TYPE:
return FieldType.iterable(
fieldTypeFromProto(protoFieldType.getIterableType().getElementType()));
case MAP_TYPE:
return FieldType.map(
fieldTypeFromProto(protoFieldType.getMapType().getKeyType()),
fieldTypeFromProto(protoFieldType.getMapType().getValueType()));
case LOGICAL_TYPE:
SchemaApi.LogicalType logicalType = protoFieldType.getLogicalType();
String urn = logicalType.getUrn();
Class<? extends LogicalType<?, ?>> logicalTypeClass = STANDARD_LOGICAL_TYPES.get(urn);
if (logicalTypeClass != null) {
boolean hasArgument = logicalType.hasArgument();
if (hasArgument) {
// Logical type with argument. Construct from compatible of() method with single
// argument type is either a primitive, List, Map, or Row.
FieldType fieldType = fieldTypeFromProto(logicalType.getArgumentType());
Object fieldValue =
Objects.requireNonNull(fieldValueFromProto(fieldType, logicalType.getArgument()));
Class clazz = fieldValue.getClass();
if (ClassUtils.isPrimitiveWrapper(clazz)) {
// argument is a primitive wrapper type (e.g. Integer)
clazz = ClassUtils.wrapperToPrimitive(clazz);
} else if (fieldValue instanceof List) {
// argument is ArrayValue or iterableValue
clazz = List.class;
}
if (fieldValue instanceof Map) {
// argument is Map
clazz = Map.class;
} else if (fieldValue instanceof Row) {
// argument is Row
clazz = Row.class;
}
String objectName = clazz.getName();
try {
return FieldType.logicalType(
logicalTypeClass.cast(
logicalTypeClass.getMethod("of", clazz).invoke(null, fieldValue)));
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format(
"Standard logical type '%s' does not have a static of('%s') method.",
urn, objectName),
e);
} catch (IllegalAccessException e) {
throw new RuntimeException(
String.format(
"Standard logical type '%s' has an of('%s') method, but it is not accessible.",
urn, objectName),
e);
} catch (InvocationTargetException e) {
throw new RuntimeException(
String.format(
"Error instantiating logical type '%s' with of('%s') method.",
urn, objectName),
e);
}
} else {
// Logical type without argument. Construct from constructor without parameter
try {
return FieldType.logicalType(logicalTypeClass.getConstructor().newInstance());
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format(
"Standard logical type '%s' does not have a zero-argument constructor.", urn),
e);
} catch (IllegalAccessException e) {
throw new RuntimeException(
String.format(
"Standard logical type '%s' has a zero-argument constructor, but it is not accessible.",
urn),
e);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(
String.format(
"Error instantiating logical type '%s' with zero-argument constructor.", urn),
e);
}
}
}
// Special-case for DATETIME and DECIMAL which are logical types in portable representation,
// but not yet in Java. (https://github.com/apache/beam/issues/19817)
if (urn.equals(URN_BEAM_LOGICAL_MILLIS_INSTANT)) {
return FieldType.DATETIME;
} else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) {
return FieldType.DECIMAL;
} else if (urn.startsWith("beam:logical_type:")) {
if (!logicalType.getPayload().isEmpty()) {
// logical type has a payload, try to recover the instance by deserialization
try {
return FieldType.logicalType(
(LogicalType)
SerializableUtils.deserializeFromByteArray(
logicalType.getPayload().toByteArray(), "logicalType"));
} catch (IllegalArgumentException e) {
LOG.warn(
"Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.",
urn);
}
} else {
// logical type does not have a payload. This happens when it is passed xlang.
// TODO(yathu) it appears this path is called heavily, consider cache the instance
LOG.debug("Constructing non-standard logical type {} as UnknownLogicalType", urn);
}
}
// assemble an UnknownLogicalType
@Nullable FieldType argumentType = null;
@Nullable Object argumentValue = null;
if (logicalType.hasArgumentType()) {
argumentType = fieldTypeFromProto(logicalType.getArgumentType());
argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument());
}
return FieldType.logicalType(
new UnknownLogicalType(
urn,
logicalType.getPayload().toByteArray(),
argumentType,
argumentValue,
fieldTypeFromProto(logicalType.getRepresentation())));
default:
throw new IllegalArgumentException(
"Unexpected type_info: " + protoFieldType.getTypeInfoCase());
}
}