public Schema getOutputSchema()

in datafu-pig/src/main/java/datafu/pig/bags/BagJoin.java [249:306]


  public Schema getOutputSchema(Schema input)
  {
    ArrayList<String> bagNames = new ArrayList<String>(input.size() / 2);
    Map<String, String> bagNameToJoinPrefix = new HashMap<String, String>(input.size() / 2);
    Map<String, Integer> bagNameToSize = new HashMap<String, Integer>(input.size() / 2);
    Schema outputSchema = null;
    Schema bagSchema = new Schema();
    try {
      int i = 0;
      // all even fields should be bags, odd fields are key names
      String bagName = null;
      String tupleName = null;
      for (FieldSchema outerField : input.getFields()) {
        if (i++ % 2 == 1)
          continue;
        bagName = outerField.alias;
        bagNames.add(bagName);
        if (bagName == null)
          bagName = "null";
        if (outerField.schema == null)
          throw new RuntimeException("Expected input format of (bag, 'field') pairs. "
              +"Did not receive a bag at index: "+i+", alias: "+bagName+". "
              +"Instead received type: "+DataType.findTypeName(outerField.type)+" in schema:"+input.toString());
        FieldSchema tupleField = outerField.schema.getField(0);
        tupleName = tupleField.alias;
        bagNameToJoinPrefix.put(bagName, getPrefixedAliasName(outerField.alias, tupleName));
        if (tupleField.schema == null) {
          log.error(String.format("could not get schema for inner tuple %s in bag %s", tupleName, bagName));
        } else {
          bagNameToSize.put(bagName, tupleField.schema.size());
          for (FieldSchema innerField : tupleField.schema.getFields()) {
            String innerFieldName = innerField.alias;
            if (innerFieldName == null)
              innerFieldName = "null";
            String outputFieldName = bagName + "::" + innerFieldName;
            if (innerField.schema != null) {
                bagSchema.add(new FieldSchema(outputFieldName, innerField.schema, innerField.type));
            } else {
                bagSchema.add(new FieldSchema(outputFieldName, innerField.type));
            }
          }
        }
      }
      outputSchema = new Schema(new Schema.FieldSchema(
                getSchemaName(this.getClass().getName().toLowerCase(), input),
                bagSchema,
                DataType.BAG));
      log.debug("output schema: "+outputSchema.toString());
    } catch (FrontendException e) {
      e.printStackTrace();
      throw new RuntimeException(e);
    }
    Properties properties = getInstanceProperties();
    properties.put(BAG_NAMES_PROPERTY, bagNames);
    properties.put(BAG_NAME_TO_JOIN_PREFIX_PROPERTY, bagNameToJoinPrefix);
    properties.put(BAG_NAME_TO_SIZE_PROPERTY, bagNameToSize);
    return outputSchema;
  }