in datafu-pig/src/main/java/datafu/pig/bags/CountEach.java [115:163]
public Schema outputSchema(Schema input)
{
try {
if (input.size() != 1)
{
throw new RuntimeException("Expected input to have one field");
}
Schema.FieldSchema bagFieldSchema = input.getField(0);
if (bagFieldSchema.type != DataType.BAG)
{
throw new RuntimeException("Expected a BAG as input");
}
Schema inputBagSchema = bagFieldSchema.schema;
if (inputBagSchema.getField(0).type != DataType.TUPLE)
{
throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
DataType.findTypeName(inputBagSchema.getField(0).type)));
}
Schema inputTupleSchema = inputBagSchema.getField(0).schema;
if (inputTupleSchema == null) inputTupleSchema = new Schema();
Schema outputTupleSchema = null;
if (this.flatten) {
outputTupleSchema = inputTupleSchema.clone();
outputTupleSchema.add(new Schema.FieldSchema("count", DataType.INTEGER));
} else {
outputTupleSchema = new Schema();
outputTupleSchema.add(new Schema.FieldSchema("tuple_schema", inputTupleSchema.clone(), DataType.TUPLE));
outputTupleSchema.add(new Schema.FieldSchema("count", DataType.INTEGER));
}
return new Schema(new Schema.FieldSchema(
getSchemaName(this.getClass().getName().toLowerCase(), input),
outputTupleSchema,
DataType.BAG));
}
catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
catch (FrontendException e) {
throw new RuntimeException(e);
}
}