in src/org/apache/pig/impl/logicalLayer/schema/Schema.java [1366:1521]
public static Schema mergeSchema(Schema schema,
Schema other,
boolean otherTakesAliasPrecedence,
boolean allowDifferentSizeMerge,
boolean allowIncompatibleTypes)
throws SchemaMergeException {
if(schema == null && other == null){
//if both are null, they are not incompatible
return null;
}
if (schema == null) {
if (allowIncompatibleTypes) {
return null ;
}
else {
int errCode = 1029;
String msg = "One of the schemas is null for merging schemas. Schema: " + schema + " Other schema: " + other;
throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
}
}
if (other == null) {
if (allowIncompatibleTypes) {
return null ;
}
else {
int errCode = 1029;
String msg = "One of the schemas is null for merging schemas. Schema: " + schema + " Other schema: " + other;
throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
}
}
if ( (schema.size() != other.size()) &&
(!allowDifferentSizeMerge) ) {
int errCode = 1030;
String msg = "Different schema sizes for merging schemas. Schema size: " + schema.size() + " Other schema size: " + other.size();
throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
}
List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;
List<FieldSchema> mylist = schema.mFields ;
List<FieldSchema> otherlist = other.mFields ;
// We iterate up to the smaller one's size
int iterateLimit = schema.mFields.size() > other.mFields.size()?
other.mFields.size() : schema.mFields.size() ;
int idx = 0;
for (; idx< iterateLimit ; idx ++) {
// Just for readability
FieldSchema myFs = mylist.get(idx) ;
FieldSchema otherFs = otherlist.get(idx) ;
byte mergedType = DataType.mergeType(myFs.type, otherFs.type) ;
// If the types cannot be merged
if (mergedType == DataType.ERROR) {
// If treatIncompatibleAsByteArray is true,
// we will treat it as bytearray
if (allowIncompatibleTypes) {
mergedType = DataType.BYTEARRAY ;
}
// otherwise the schemas cannot be merged
else {
int errCode = 1031;
String msg = "Incompatible types for merging schemas. Field schema type: "
+ DataType.findTypeName(myFs.type) + " Other field schema type: "
+ DataType.findTypeName(otherFs.type);
throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
}
}
String mergedAlias = mergeAlias(myFs.alias,
otherFs.alias,
otherTakesAliasPrecedence) ;
FieldSchema mergedFs = null ;
if (!DataType.isSchemaType(mergedType)) {
// just normal merge
mergedFs = new FieldSchema(mergedAlias, mergedType) ;
}
else {
// merge inner tuple because both sides are tuples
//if inner schema are incompatible and allowIncompatibleTypes==true
// an exception is thrown by mergeSchema
Schema mergedSubSchema = mergeSchema(myFs.schema,
otherFs.schema,
otherTakesAliasPrecedence,
allowDifferentSizeMerge,
allowIncompatibleTypes) ;
// create the merged field
// the mergedSubSchema can be true if allowIncompatibleTypes
try {
mergedFs = new FieldSchema(mergedAlias, mergedSubSchema, mergedType) ;
} catch (FrontendException e) {
int errCode = 2124;
String errMsg = "Internal Error: Unexpected error creating field schema";
throw new SchemaMergeException(errMsg, errCode, PigException.BUG, e);
}
}
outputList.add(mergedFs) ;
}
// Handle different schema size
if (allowDifferentSizeMerge) {
// if the first schema has leftover, then append the rest
for(int i=idx; i < mylist.size(); i++) {
FieldSchema fs = mylist.get(i) ;
// for non-schema types
if (!DataType.isSchemaType(fs.type)) {
outputList.add(new FieldSchema(fs.alias, fs.type)) ;
}
// for TUPLE & BAG
else {
FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ;
tmp.type = fs.type ;
outputList.add(tmp) ;
}
}
// if the second schema has leftover, then append the rest
for(int i=idx; i < otherlist.size(); i++) {
FieldSchema fs = otherlist.get(i) ;
// for non-schema types
if (!DataType.isSchemaType(fs.type)) {
outputList.add(new FieldSchema(fs.alias, fs.type)) ;
}
// for TUPLE & BAG
else {
FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ;
tmp.type = fs.type ;
outputList.add(tmp) ;
}
}
}
Schema result = new Schema(outputList);
if (schema.isTwoLevelAccessRequired()!=other.isTwoLevelAccessRequired()) {
int errCode = 2124;
String errMsg = "Cannot merge schema " + schema + " and " + other + ". One with twoLeverAccess flag, the other doesn't.";
throw new SchemaMergeException(errMsg, errCode, PigException.BUG);
}
if (schema.isTwoLevelAccessRequired())
result.setTwoLevelAccessRequired(true);
return result;
}