in datafu-pig/src/main/java/datafu/pig/bags/BagConcat.java [113:173]
public Schema outputSchema(Schema input)
{
try {
Schema outputBagTupleSchema = null;
if (input.size() == 0) {
throw new RuntimeException("Expected input tuple to contain fields. Got none.");
}
// determine whether the input is a tuple of bags or a bag of bags
if (input.size() != 1) {
// tuple of bags
// verify that each element in the input is a bag
for (FieldSchema fieldSchema : input.getFields()) {
if (fieldSchema.type != DataType.BAG) {
throwBadTypeError("Expected a TUPLE of BAGs as input. Got instead: %s in schema: %s", fieldSchema.type, input);
}
if (fieldSchema.schema == null) {
throwBadSchemaError(fieldSchema.alias, input);
}
}
outputBagTupleSchema = input.getField(0).schema;
} else {
// bag of bags
// should only be a single element in the input and it should be a bag
FieldSchema fieldSchema = input.getField(0);
if (fieldSchema.type != DataType.BAG) {
throwBadTypeError("Expected a BAG of BAGs as input. Got instead: %s in schema: %s", fieldSchema.type, input);
}
// take the tuple schema from this outer bag
Schema outerBagTuple = input.getField(0).schema;
// verify that this tuple contains only a bag for each element
Schema outerBagSchema = outerBagTuple.getField(0).schema;
if (outerBagSchema.size() != 1) {
throw new RuntimeException(String.format("Expected outer bag to have only a single field. Got instead: %s",
outerBagSchema.prettyPrint()));
}
FieldSchema outerBagFieldSchema = outerBagSchema.getField(0);
if (outerBagFieldSchema.type != DataType.BAG) {
throwBadTypeError("Expected a BAG of BAGs as input. Got instead: %s", outerBagFieldSchema.type, outerBagTuple);
}
// take the schema of the inner tuple as the schema for the tuple in the output bag
FieldSchema innerTupleSchema = outerBagSchema.getField(0);
if (innerTupleSchema.schema == null) {
throwBadSchemaError(innerTupleSchema.alias, outerBagSchema);
}
outputBagTupleSchema = innerTupleSchema.schema;
}
// return the correct schema
return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
outputBagTupleSchema, DataType.BAG));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}