in datafu-pig/src/main/java/datafu/pig/bags/BagJoin.java [249:306]
public Schema getOutputSchema(Schema input)
{
ArrayList<String> bagNames = new ArrayList<String>(input.size() / 2);
Map<String, String> bagNameToJoinPrefix = new HashMap<String, String>(input.size() / 2);
Map<String, Integer> bagNameToSize = new HashMap<String, Integer>(input.size() / 2);
Schema outputSchema = null;
Schema bagSchema = new Schema();
try {
int i = 0;
// all even fields should be bags, odd fields are key names
String bagName = null;
String tupleName = null;
for (FieldSchema outerField : input.getFields()) {
if (i++ % 2 == 1)
continue;
bagName = outerField.alias;
bagNames.add(bagName);
if (bagName == null)
bagName = "null";
if (outerField.schema == null)
throw new RuntimeException("Expected input format of (bag, 'field') pairs. "
+"Did not receive a bag at index: "+i+", alias: "+bagName+". "
+"Instead received type: "+DataType.findTypeName(outerField.type)+" in schema:"+input.toString());
FieldSchema tupleField = outerField.schema.getField(0);
tupleName = tupleField.alias;
bagNameToJoinPrefix.put(bagName, getPrefixedAliasName(outerField.alias, tupleName));
if (tupleField.schema == null) {
log.error(String.format("could not get schema for inner tuple %s in bag %s", tupleName, bagName));
} else {
bagNameToSize.put(bagName, tupleField.schema.size());
for (FieldSchema innerField : tupleField.schema.getFields()) {
String innerFieldName = innerField.alias;
if (innerFieldName == null)
innerFieldName = "null";
String outputFieldName = bagName + "::" + innerFieldName;
if (innerField.schema != null) {
bagSchema.add(new FieldSchema(outputFieldName, innerField.schema, innerField.type));
} else {
bagSchema.add(new FieldSchema(outputFieldName, innerField.type));
}
}
}
}
outputSchema = new Schema(new Schema.FieldSchema(
getSchemaName(this.getClass().getName().toLowerCase(), input),
bagSchema,
DataType.BAG));
log.debug("output schema: "+outputSchema.toString());
} catch (FrontendException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
Properties properties = getInstanceProperties();
properties.put(BAG_NAMES_PROPERTY, bagNames);
properties.put(BAG_NAME_TO_JOIN_PREFIX_PROPERTY, bagNameToJoinPrefix);
properties.put(BAG_NAME_TO_SIZE_PROPERTY, bagNameToSize);
return outputSchema;
}