public Schema outputSchema()

in datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java [383:477]


  public Schema outputSchema(Schema input)
  {
    try
    {
      Schema.FieldSchema inputFieldSchema = input.getField(0);

      if (inputFieldSchema.type != DataType.BAG)
      {
        throw new RuntimeException("Expected a BAG as input");
      }

      Schema inputBagSchema = inputFieldSchema.schema;

      if (inputBagSchema.getField(0).type != DataType.TUPLE)
      {
        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
      }
      
      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
      
      if (!this.enableNodeBiasing)
      {
        if (inputTupleSchema.size() != 2)
        {
          throw new RuntimeException("Expected two fields for the node data");
        }
      }
      else
      {
        if (inputTupleSchema.size() != 3)
        {
          throw new RuntimeException("Expected three fields for the node data");
        }
      }
      
      if (inputTupleSchema.getField(0).type != DataType.INTEGER)
      {
        throw new RuntimeException(String.format("Expected source to be an INTEGER, but instead found %s",
                                                 DataType.findTypeName(inputTupleSchema.getField(0).type)));
      }

      if (inputTupleSchema.getField(1).type != DataType.BAG)
      {
        throw new RuntimeException(String.format("Expected edges to be represented with a BAG"));
      }
      
      if (this.enableNodeBiasing && inputTupleSchema.getField(2).type != DataType.DOUBLE)
      {
        throw new RuntimeException(String.format("Expected node bias to be a DOUBLE, but instead found %s",
                                                 DataType.findTypeName(inputTupleSchema.getField(2).type)));
      }

      Schema.FieldSchema edgesFieldSchema = inputTupleSchema.getField(1);

      if (edgesFieldSchema.schema.getField(0).type != DataType.TUPLE)
      {
        throw new RuntimeException(String.format("Expected edges field to contain a TUPLE, but instead found %s",
                                                 DataType.findTypeName(edgesFieldSchema.schema.getField(0).type)));
      }
      
      Schema edgesTupleSchema = edgesFieldSchema.schema.getField(0).schema;
      
      if (edgesTupleSchema.size() != 2)
      {
        throw new RuntimeException("Expected two fields for the edge data");
      }
      
      if (edgesTupleSchema.getField(0).type != DataType.INTEGER)
      {
        throw new RuntimeException(String.format("Expected destination edge ID to an INTEGER, but instead found %s",
                                                 DataType.findTypeName(edgesTupleSchema.getField(0).type)));
      }

      if (edgesTupleSchema.getField(1).type != DataType.DOUBLE)
      {
        throw new RuntimeException(String.format("Expected destination edge weight to a DOUBLE, but instead found %s",
                                                 DataType.findTypeName(edgesTupleSchema.getField(1).type)));
      }

      Schema tupleSchema = new Schema();
      tupleSchema.add(new Schema.FieldSchema("node",DataType.INTEGER));
      tupleSchema.add(new Schema.FieldSchema("rank",DataType.FLOAT));

      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
                                                                 .getName()
                                                                 .toLowerCase(), input),
                                               tupleSchema,
                                               DataType.BAG));
    }
    catch (FrontendException e)
    {
      throw new RuntimeException(e);
    }
  }