public Schema getOutputSchema()

in datafu-pig/src/main/java/datafu/pig/bags/BagGroup.java [88:149]


  public Schema getOutputSchema(Schema input)
  {
    try {
      if (input.size() != 2) {
        throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %d field.", input.size()));
      }
      // Expect the first field to be a bag
      FieldSchema bagFieldSchema = input.getField(0);
      if (bagFieldSchema.type != DataType.BAG) {
        throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as first field.", DataType.findTypeName(bagFieldSchema.type)));
      }
      // Expect the second fields to be a projection of the bag
      FieldSchema projectedBagFieldSchema = input.getField(1);
      if (projectedBagFieldSchema.type != DataType.BAG) {
        throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as second field.", DataType.findTypeName(projectedBagFieldSchema.type)));
      }

      String bagName = bagFieldSchema.alias;
      // handle named tuples
      if (bagFieldSchema.schema.size() == 1) {
        FieldSchema bagTupleFieldSchema = bagFieldSchema.schema.getField(0);
        if (bagTupleFieldSchema.type == DataType.TUPLE && bagTupleFieldSchema.alias != null) {
          bagName = getPrefixedAliasName(bagName, bagTupleFieldSchema.alias);
        }
      }
      if (projectedBagFieldSchema.schema.size() == 1) {
        FieldSchema projectedBagTupleFieldSchema = projectedBagFieldSchema.schema.getField(0);
        if (projectedBagTupleFieldSchema.type == DataType.TUPLE && projectedBagTupleFieldSchema.schema != null) {
          projectedBagFieldSchema = projectedBagTupleFieldSchema;
        }
      }

      // create the output schema for the 'group'
      // store the field names for the group keys
      Schema groupTupleSchema = new Schema();
      fieldNames = new ArrayList<String>(projectedBagFieldSchema.schema.size());
      for (int i=0; i<projectedBagFieldSchema.schema.size(); i++) {
        FieldSchema fieldSchema = projectedBagFieldSchema.schema.getField(i);
        String fieldName = fieldSchema.alias;
        fieldNames.add(getPrefixedAliasName(bagName, fieldName));
        groupTupleSchema.add(new FieldSchema(fieldSchema.alias, fieldSchema.type));
      }
      getInstanceProperties().put(FIELD_NAMES_PROPERTY, fieldNames);

      Schema outputTupleSchema = new Schema();
      if (projectedBagFieldSchema.schema.size() > 1) {
        // multiple group keys
        outputTupleSchema.add(new FieldSchema("group", groupTupleSchema, DataType.TUPLE));
      } else {
        // single group key
        outputTupleSchema.add(new FieldSchema("group", groupTupleSchema.getField(0).type));
      }
      outputTupleSchema.add(bagFieldSchema);

      return new Schema(new Schema.FieldSchema(
            getSchemaName(this.getClass().getName().toLowerCase(), input),
            outputTupleSchema,
            DataType.BAG));
    } catch (FrontendException e) {
      throw new RuntimeException(e);
    }
  }