public Schema outputSchema()

in datafu-pig/src/main/java/datafu/pig/bags/CountEach.java [115:163]


  public Schema outputSchema(Schema input)
  {
    try {
      if (input.size() != 1)
      {
        throw new RuntimeException("Expected input to have one field");
      }
      
      Schema.FieldSchema bagFieldSchema = input.getField(0);

      if (bagFieldSchema.type != DataType.BAG)
      {
        throw new RuntimeException("Expected a BAG as input");
      }
      
      Schema inputBagSchema = bagFieldSchema.schema;

      if (inputBagSchema.getField(0).type != DataType.TUPLE)
      {
        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
      }      
      
      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
      if (inputTupleSchema == null) inputTupleSchema = new Schema();
      
      Schema outputTupleSchema = null;
      
      if (this.flatten) {
        outputTupleSchema = inputTupleSchema.clone();
        outputTupleSchema.add(new Schema.FieldSchema("count", DataType.INTEGER));
      } else {        
        outputTupleSchema = new Schema();
        outputTupleSchema.add(new Schema.FieldSchema("tuple_schema", inputTupleSchema.clone(), DataType.TUPLE));
        outputTupleSchema.add(new Schema.FieldSchema("count", DataType.INTEGER));
      }
      
      return new Schema(new Schema.FieldSchema(
            getSchemaName(this.getClass().getName().toLowerCase(), input),
            outputTupleSchema, 
            DataType.BAG));
    }
    catch (CloneNotSupportedException e) {
      throw new RuntimeException(e);
    }
    catch (FrontendException e) {
      throw new RuntimeException(e);
    }
  }