in datafu-pig/src/main/java/datafu/pig/bags/BagGroup.java [88:149]
public Schema getOutputSchema(Schema input)
{
try {
if (input.size() != 2) {
throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %d field.", input.size()));
}
// Expect the first field to be a bag
FieldSchema bagFieldSchema = input.getField(0);
if (bagFieldSchema.type != DataType.BAG) {
throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as first field.", DataType.findTypeName(bagFieldSchema.type)));
}
// Expect the second fields to be a projection of the bag
FieldSchema projectedBagFieldSchema = input.getField(1);
if (projectedBagFieldSchema.type != DataType.BAG) {
throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as second field.", DataType.findTypeName(projectedBagFieldSchema.type)));
}
String bagName = bagFieldSchema.alias;
// handle named tuples
if (bagFieldSchema.schema.size() == 1) {
FieldSchema bagTupleFieldSchema = bagFieldSchema.schema.getField(0);
if (bagTupleFieldSchema.type == DataType.TUPLE && bagTupleFieldSchema.alias != null) {
bagName = getPrefixedAliasName(bagName, bagTupleFieldSchema.alias);
}
}
if (projectedBagFieldSchema.schema.size() == 1) {
FieldSchema projectedBagTupleFieldSchema = projectedBagFieldSchema.schema.getField(0);
if (projectedBagTupleFieldSchema.type == DataType.TUPLE && projectedBagTupleFieldSchema.schema != null) {
projectedBagFieldSchema = projectedBagTupleFieldSchema;
}
}
// create the output schema for the 'group'
// store the field names for the group keys
Schema groupTupleSchema = new Schema();
fieldNames = new ArrayList<String>(projectedBagFieldSchema.schema.size());
for (int i=0; i<projectedBagFieldSchema.schema.size(); i++) {
FieldSchema fieldSchema = projectedBagFieldSchema.schema.getField(i);
String fieldName = fieldSchema.alias;
fieldNames.add(getPrefixedAliasName(bagName, fieldName));
groupTupleSchema.add(new FieldSchema(fieldSchema.alias, fieldSchema.type));
}
getInstanceProperties().put(FIELD_NAMES_PROPERTY, fieldNames);
Schema outputTupleSchema = new Schema();
if (projectedBagFieldSchema.schema.size() > 1) {
// multiple group keys
outputTupleSchema.add(new FieldSchema("group", groupTupleSchema, DataType.TUPLE));
} else {
// single group key
outputTupleSchema.add(new FieldSchema("group", groupTupleSchema.getField(0).type));
}
outputTupleSchema.add(bagFieldSchema);
return new Schema(new Schema.FieldSchema(
getSchemaName(this.getClass().getName().toLowerCase(), input),
outputTupleSchema,
DataType.BAG));
} catch (FrontendException e) {
throw new RuntimeException(e);
}
}