public Schema outputSchema()

in datafu-pig/src/main/java/datafu/pig/bags/BagConcat.java [113:173]


  public Schema outputSchema(Schema input)
  {
    try {
      Schema outputBagTupleSchema = null;
      
      if (input.size() == 0) {
        throw new RuntimeException("Expected input tuple to contain fields. Got none.");
      }
      // determine whether the input is a tuple of bags or a bag of bags      
      if (input.size() != 1) {
        // tuple of bags
        
        // verify that each element in the input is a bag
        for (FieldSchema fieldSchema : input.getFields()) {
          if (fieldSchema.type != DataType.BAG) {
            throwBadTypeError("Expected a TUPLE of BAGs as input.  Got instead: %s in schema: %s", fieldSchema.type, input);
          }
          if (fieldSchema.schema == null) {
            throwBadSchemaError(fieldSchema.alias, input);
          }
        }
        
        outputBagTupleSchema = input.getField(0).schema;                
      } else {
        // bag of bags
        
        // should only be a single element in the input and it should be a bag
        FieldSchema fieldSchema = input.getField(0); 
        if (fieldSchema.type != DataType.BAG) {
          throwBadTypeError("Expected a BAG of BAGs as input.  Got instead: %s in schema: %s", fieldSchema.type, input);
        }
        
        // take the tuple schema from this outer bag
        Schema outerBagTuple = input.getField(0).schema;      
        // verify that this tuple contains only a bag for each element
        Schema outerBagSchema = outerBagTuple.getField(0).schema;      
        if (outerBagSchema.size() != 1) {
          throw new RuntimeException(String.format("Expected outer bag to have only a single field.  Got instead: %s", 
                                                   outerBagSchema.prettyPrint()));
        }
        FieldSchema outerBagFieldSchema = outerBagSchema.getField(0); 
        if (outerBagFieldSchema.type != DataType.BAG) {
          throwBadTypeError("Expected a BAG of BAGs as input.  Got instead: %s", outerBagFieldSchema.type, outerBagTuple); 
        }
        
        // take the schema of the inner tuple as the schema for the tuple in the output bag
        FieldSchema innerTupleSchema = outerBagSchema.getField(0);        
        if (innerTupleSchema.schema == null) {
          throwBadSchemaError(innerTupleSchema.alias, outerBagSchema);
        }
        outputBagTupleSchema = innerTupleSchema.schema;
      }    
      
      // return the correct schema
      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
                                                      outputBagTupleSchema, DataType.BAG));      
    }
    catch (Exception e) {
      throw new RuntimeException(e);
    }
  }