public Schema outputSchema()

in datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java [89:140]


    public Schema outputSchema(Schema input) {
      try {
        Schema.FieldSchema inputFieldSchema = input.getField(0);

        if (inputFieldSchema.type != DataType.BAG) {
          throw new RuntimeException("Expected a BAG as input");
        }
        
        Schema inputBagSchema = inputFieldSchema.schema;
        
        if (inputBagSchema.getField(0).type != DataType.TUPLE)
        {
            throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
                                                   DataType.findTypeName(inputBagSchema.getField(0).type)));
        }
        
        Schema tupleSchema = inputBagSchema.getField(0).schema;
        
        if(tupleSchema == null) {
            throw new RuntimeException("The tuple of input bag has no schema");
        }
        
        List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
        
        if(fieldSchemaList == null || fieldSchemaList.size() <= Math.max(0, this.weightIdx)) {
            throw new RuntimeException("The field schema of the input tuple is null " +
            		                   "or the tuple size is no more than the weight field index: "
                                       + this.weightIdx);
        }
        
        if(fieldSchemaList.get(this.weightIdx).type != DataType.INTEGER &&
           fieldSchemaList.get(this.weightIdx).type != DataType.LONG &&
           fieldSchemaList.get(this.weightIdx).type != DataType.FLOAT &&
           fieldSchemaList.get(this.weightIdx).type != DataType.DOUBLE)
        {
            String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
                                                   DataType.findTypeName(DataType.LONG),
                                                   DataType.findTypeName(DataType.FLOAT),
                                                   DataType.findTypeName(DataType.DOUBLE)};
            throw new RuntimeException("Expect the type of the weight field of the input tuple to be of (" +
                    java.util.Arrays.toString(expectedTypes) + "), but instead found (" + 
                    DataType.findTypeName(fieldSchemaList.get(this.weightIdx).type) + "), weight field: " + 
                    this.weightIdx);
        } 
        
        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
                                                 inputFieldSchema.schema, DataType.BAG));    
      } catch (FrontendException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
      }
    }