in datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java [383:477]
public Schema outputSchema(Schema input)
{
try
{
Schema.FieldSchema inputFieldSchema = input.getField(0);
if (inputFieldSchema.type != DataType.BAG)
{
throw new RuntimeException("Expected a BAG as input");
}
Schema inputBagSchema = inputFieldSchema.schema;
if (inputBagSchema.getField(0).type != DataType.TUPLE)
{
throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
DataType.findTypeName(inputBagSchema.getField(0).type)));
}
Schema inputTupleSchema = inputBagSchema.getField(0).schema;
if (!this.enableNodeBiasing)
{
if (inputTupleSchema.size() != 2)
{
throw new RuntimeException("Expected two fields for the node data");
}
}
else
{
if (inputTupleSchema.size() != 3)
{
throw new RuntimeException("Expected three fields for the node data");
}
}
if (inputTupleSchema.getField(0).type != DataType.INTEGER)
{
throw new RuntimeException(String.format("Expected source to be an INTEGER, but instead found %s",
DataType.findTypeName(inputTupleSchema.getField(0).type)));
}
if (inputTupleSchema.getField(1).type != DataType.BAG)
{
throw new RuntimeException(String.format("Expected edges to be represented with a BAG"));
}
if (this.enableNodeBiasing && inputTupleSchema.getField(2).type != DataType.DOUBLE)
{
throw new RuntimeException(String.format("Expected node bias to be a DOUBLE, but instead found %s",
DataType.findTypeName(inputTupleSchema.getField(2).type)));
}
Schema.FieldSchema edgesFieldSchema = inputTupleSchema.getField(1);
if (edgesFieldSchema.schema.getField(0).type != DataType.TUPLE)
{
throw new RuntimeException(String.format("Expected edges field to contain a TUPLE, but instead found %s",
DataType.findTypeName(edgesFieldSchema.schema.getField(0).type)));
}
Schema edgesTupleSchema = edgesFieldSchema.schema.getField(0).schema;
if (edgesTupleSchema.size() != 2)
{
throw new RuntimeException("Expected two fields for the edge data");
}
if (edgesTupleSchema.getField(0).type != DataType.INTEGER)
{
throw new RuntimeException(String.format("Expected destination edge ID to an INTEGER, but instead found %s",
DataType.findTypeName(edgesTupleSchema.getField(0).type)));
}
if (edgesTupleSchema.getField(1).type != DataType.DOUBLE)
{
throw new RuntimeException(String.format("Expected destination edge weight to a DOUBLE, but instead found %s",
DataType.findTypeName(edgesTupleSchema.getField(1).type)));
}
Schema tupleSchema = new Schema();
tupleSchema.add(new Schema.FieldSchema("node",DataType.INTEGER));
tupleSchema.add(new Schema.FieldSchema("rank",DataType.FLOAT));
return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
.getName()
.toLowerCase(), input),
tupleSchema,
DataType.BAG));
}
catch (FrontendException e)
{
throw new RuntimeException(e);
}
}