public static Schema mergeSchema()

in src/org/apache/pig/impl/logicalLayer/schema/Schema.java [1366:1521]


    public static Schema mergeSchema(Schema schema,
                               Schema other,
                               boolean otherTakesAliasPrecedence,
                               boolean allowDifferentSizeMerge,
                               boolean allowIncompatibleTypes)
                                    throws SchemaMergeException {
        if(schema == null && other == null){
            //if both are null, they are not incompatible
            return null;
        }
        if (schema == null) {
            if (allowIncompatibleTypes) {
                return null ;
            }
            else {
                int errCode = 1029;
                String msg = "One of the schemas is null for merging schemas. Schema: " + schema + " Other schema: " + other;
                throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
            }
        }

        if (other == null) {
            if (allowIncompatibleTypes) {
                return null ;
            }
            else {
                int errCode = 1029;
                String msg = "One of the schemas is null for merging schemas. Schema: " + schema + " Other schema: " + other;
                throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
            }
        }

        if ( (schema.size() != other.size()) &&
             (!allowDifferentSizeMerge) ) {
            int errCode = 1030;
            String msg = "Different schema sizes for merging schemas. Schema size: " + schema.size() + " Other schema size: " + other.size();
            throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
        }

        List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;

        List<FieldSchema> mylist = schema.mFields ;
        List<FieldSchema> otherlist = other.mFields ;

        // We iterate up to the smaller one's size
        int iterateLimit = schema.mFields.size() > other.mFields.size()?
                            other.mFields.size() : schema.mFields.size() ;

        int idx = 0;
        for (; idx< iterateLimit ; idx ++) {

            // Just for readability
            FieldSchema myFs = mylist.get(idx) ;
            FieldSchema otherFs = otherlist.get(idx) ;

            byte mergedType = DataType.mergeType(myFs.type, otherFs.type) ;

            // If the types cannot be merged
            if (mergedType == DataType.ERROR) {
                // If  treatIncompatibleAsByteArray is true,
                // we will treat it as bytearray
                if (allowIncompatibleTypes) {
                    mergedType = DataType.BYTEARRAY ;
                }
                // otherwise the schemas cannot be merged
                else {
                    int errCode = 1031;
                    String msg = "Incompatible types for merging schemas. Field schema type: "
                        + DataType.findTypeName(myFs.type) + " Other field schema type: "
                        + DataType.findTypeName(otherFs.type);
                    throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
                }
            }

            String mergedAlias = mergeAlias(myFs.alias,
                                            otherFs.alias,
                                            otherTakesAliasPrecedence) ;

            FieldSchema mergedFs = null ;
            if (!DataType.isSchemaType(mergedType)) {
                // just normal merge
                mergedFs = new FieldSchema(mergedAlias, mergedType) ;
            }
            else {
                // merge inner tuple because both sides are tuples
                //if inner schema are incompatible and allowIncompatibleTypes==true
                // an exception is thrown by mergeSchema
                Schema mergedSubSchema = mergeSchema(myFs.schema,
                                                     otherFs.schema,
                                                     otherTakesAliasPrecedence,
                                                     allowDifferentSizeMerge,
                                                     allowIncompatibleTypes) ;

                // create the merged field
                // the mergedSubSchema can be true if allowIncompatibleTypes
                try {
                    mergedFs = new FieldSchema(mergedAlias, mergedSubSchema, mergedType) ;
                } catch (FrontendException e) {
                    int errCode = 2124;
                    String errMsg = "Internal Error: Unexpected error creating field schema";
                    throw new SchemaMergeException(errMsg, errCode, PigException.BUG, e);
                }

            }
            outputList.add(mergedFs) ;
        }

        // Handle different schema size
        if (allowDifferentSizeMerge) {
            
            // if the first schema has leftover, then append the rest
            for(int i=idx; i < mylist.size(); i++) {

                FieldSchema fs = mylist.get(i) ;

                // for non-schema types
                if (!DataType.isSchemaType(fs.type)) {
                    outputList.add(new FieldSchema(fs.alias, fs.type)) ;
                }
                // for TUPLE & BAG
                else {
                    FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ;
                    tmp.type = fs.type ;
                    outputList.add(tmp) ;
                }
            }

             // if the second schema has leftover, then append the rest
            for(int i=idx; i < otherlist.size(); i++) {

                FieldSchema fs = otherlist.get(i) ;

                // for non-schema types
                if (!DataType.isSchemaType(fs.type)) {
                    outputList.add(new FieldSchema(fs.alias, fs.type)) ;
                }
                // for TUPLE & BAG
                else {
                    FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ;
                    tmp.type = fs.type ;
                    outputList.add(tmp) ;
                }
            }

        }

        Schema result = new Schema(outputList);
        if (schema.isTwoLevelAccessRequired()!=other.isTwoLevelAccessRequired()) {
            int errCode = 2124;
            String errMsg = "Cannot merge schema " + schema + " and " + other + ". One with twoLeverAccess flag, the other doesn't.";
            throw new SchemaMergeException(errMsg, errCode, PigException.BUG);
        }
        if (schema.isTwoLevelAccessRequired())
            result.setTwoLevelAccessRequired(true);
        return result;
    }