in src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java [429:541]
private static Schema resourceFieldSchemaToAvroSchema(
final String name, final String nameSpace,
final byte type, final String description,
final ResourceSchema schema,
final Map<String, List<Schema>> definedRecordNames,
final Boolean doubleColonsToDoubleUnderscores)
throws IOException {
switch (type) {
case DataType.BAG:
Schema innerBagSchema = resourceSchemaToAvroSchema(
schema.getFields()[0].getSchema(), name, null,
definedRecordNames,
doubleColonsToDoubleUnderscores);
if (innerBagSchema == null) {
throw new IOException("AvroStorage can't save bags with untyped values; please specify a value type or a schema.");
}
return createNullableUnion(Schema.createArray(innerBagSchema));
case DataType.BIGCHARARRAY:
return createNullableUnion(Type.STRING);
case DataType.BOOLEAN:
return createNullableUnion(Type.BOOLEAN);
case DataType.BYTEARRAY:
Schema fixedSchema;
try {
fixedSchema = (new Schema.Parser()).parse(description);
} catch (Exception e) {
fixedSchema = null;
}
if (fixedSchema == null) {
return createNullableUnion(Type.BYTES);
} else {
return createNullableUnion(fixedSchema);
}
case DataType.CHARARRAY:
return createNullableUnion(Type.STRING);
case DataType.DATETIME:
return createNullableUnion(Type.LONG);
case DataType.DOUBLE:
return createNullableUnion(Type.DOUBLE);
case DataType.FLOAT:
return createNullableUnion(Type.FLOAT);
case DataType.INTEGER:
return createNullableUnion(Type.INT);
case DataType.LONG:
return createNullableUnion(Type.LONG);
case DataType.MAP:
if (schema == null) {
throw new IOException("AvroStorage can't save maps with untyped values; please specify a value type or a schema.");
}
byte innerType = schema.getFields()[0].getType();
String desc = schema.getFields()[0].getDescription();
if (desc != null) {
if (desc.equals("autogenerated from Pig Field Schema")) {
desc = null;
}
}
Schema innerSchema;
if (DataType.isComplex(innerType)) {
innerSchema = createNullableUnion(
Schema.createMap(resourceSchemaToAvroSchema(
schema.getFields()[0].getSchema(),
name, nameSpace, definedRecordNames,
doubleColonsToDoubleUnderscores)));
} else {
innerSchema = createNullableUnion(
Schema.createMap(resourceFieldSchemaToAvroSchema(
name, nameSpace, innerType,
desc, null, definedRecordNames,
doubleColonsToDoubleUnderscores)));
}
return innerSchema;
case DataType.NULL:
return Schema.create(Type.NULL);
case DataType.TUPLE:
if (schema == null) {
throw new IOException("AvroStorage can't save tuples with untyped values; please specify a value type or a schema.");
}
Schema returnSchema = createNullableUnion(
resourceSchemaToAvroSchema(schema, name, null,
definedRecordNames, doubleColonsToDoubleUnderscores));
if (definedRecordNames.containsKey(name)) {
List<Schema> schemaList = definedRecordNames.get(name);
boolean notfound = true;
for (Schema cachedSchema : schemaList) {
if (returnSchema.equals(cachedSchema)) {
notfound = false;
}
break;
}
if (notfound) {
returnSchema = createNullableUnion(resourceSchemaToAvroSchema(
schema, name + "_" + new Integer(schemaList.size()).toString(),
null, definedRecordNames, doubleColonsToDoubleUnderscores));
definedRecordNames.get(name).add(returnSchema);
}
} else {
definedRecordNames.put(name, Lists.newArrayList(returnSchema));
}
return returnSchema;
case DataType.BYTE:
case DataType.ERROR:
case DataType.GENERIC_WRITABLECOMPARABLE:
case DataType.INTERNALMAP:
case DataType.UNKNOWN:
default:
throw new IOException(
"Don't know how to encode type "
+ DataType.findTypeName(type) + " in schema "
+ ((schema == null) ? "" : schema.toString())
+ "\n");
}
}