in src/org/apache/pig/newplan/logical/relational/LogicalSchema.java [376:482]
public static LogicalFieldSchema merge(LogicalFieldSchema fs1, LogicalFieldSchema fs2, MergeMode mode) throws FrontendException {
// deal with null schema
if (mode==MergeMode.LoadForEach) {
if (fs1==null) throw new FrontendException("We cannot cast into null", 1031);
if (fs2==null) return fs1.deepCopy();
} else if (mode==MergeMode.LoadForEachInner) {
if (fs1==null)
return null;
if (fs2==null)
return fs1.deepCopy();
} else { // Union/UnionInner
if(fs1==null||fs2==null)
return null;
}
String mergedAlias;
byte mergedType = DataType.UNKNOWN;
LogicalSchema mergedSubSchema = null;
// Infer merged data type
if (mode==MergeMode.UnionInner) {
if (fs1.type!=fs2.type)
// We don't merge inner schema of different type for union, throw exception
throw new FrontendException("Incompatible field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031);
else
mergedType = fs1.type;
}
else if (mode==MergeMode.LoadForEach||mode==MergeMode.LoadForEachInner) {
if (fs1.type==DataType.NULL||fs1.type==DataType.BYTEARRAY) // If declared schema does not have type part
mergedType = fs2.type;
else if (!DataType.castable(fs1.type, fs2.type))
throw new FrontendException("Incompatible field schema: declared is \"" + fs1.toString(false) + "\", infered is \"" + fs2.toString(false) + "\"", 1031);
else mergedType = fs1.type; // If compatible type, we take the declared type
}
else {
// Union schema
if (fs1.type==DataType.BYTEARRAY) {
mergedType=fs2.type;
} else if (fs2.type==DataType.BYTEARRAY) {
mergedType = fs1.type;
}
else {
// Take the more specific type
mergedType = DataType.mergeType(fs1.type, fs2.type);
if (mergedType == DataType.ERROR) {
// True incompatible, set to bytearray
mergedType = DataType.BYTEARRAY;
}
}
}
if (fs1.alias==null)
mergedAlias = fs2.alias;
else if (fs2.alias==null)
mergedAlias = fs1.alias;
else {
mergedAlias = mergeNameSpacedAlias(fs1.alias, fs2.alias);
if (mergedAlias==null)
mergedAlias = fs1.alias;
}
if (DataType.isSchemaType(mergedType)) {
if (mode==MergeMode.Union) {
try {
if (fs1.type==DataType.BYTEARRAY) {
if (fs2.schema!=null)
mergedSubSchema = fs2.schema.deepCopy();
}
else if (fs2.type==DataType.BYTEARRAY) {
if (fs1.schema!=null)
mergedSubSchema = fs1.schema.deepCopy();
}
else {
mergedSubSchema = LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.UnionInner);
}
} catch (FrontendException e) {
if(fs1.type == DataType.BAG && fs2.type == DataType.BAG){
//create an empty tuple as subschema
mergedSubSchema = new LogicalSchema();
mergedSubSchema.addField(new LogicalFieldSchema(null, new LogicalSchema(), DataType.TUPLE));
}else if(fs1.type == DataType.TUPLE && fs2.type == DataType.TUPLE){
mergedSubSchema = new LogicalSchema();
}
// If inner schema is not compatible, mergedSubSchema set to null
}
}
else {
if (mode==MergeMode.UnionInner)
mergedSubSchema = LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.UnionInner);
else {
// LoadForEach/LoadForEachInner
if (fs1.type==DataType.BYTEARRAY)
mergedSubSchema = fs2.schema;
else {
try {
// Only check compatibility
mergedSubSchema = LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.LoadForEachInner);
} catch (FrontendException e) {
throw new FrontendException("Incompatible field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031);
}
}
}
}
}
LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
return mergedFS;
}