in datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java [89:140]
public Schema outputSchema(Schema input) {
try {
Schema.FieldSchema inputFieldSchema = input.getField(0);
if (inputFieldSchema.type != DataType.BAG) {
throw new RuntimeException("Expected a BAG as input");
}
Schema inputBagSchema = inputFieldSchema.schema;
if (inputBagSchema.getField(0).type != DataType.TUPLE)
{
throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
DataType.findTypeName(inputBagSchema.getField(0).type)));
}
Schema tupleSchema = inputBagSchema.getField(0).schema;
if(tupleSchema == null) {
throw new RuntimeException("The tuple of input bag has no schema");
}
List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
if(fieldSchemaList == null || fieldSchemaList.size() <= Math.max(0, this.weightIdx)) {
throw new RuntimeException("The field schema of the input tuple is null " +
"or the tuple size is no more than the weight field index: "
+ this.weightIdx);
}
if(fieldSchemaList.get(this.weightIdx).type != DataType.INTEGER &&
fieldSchemaList.get(this.weightIdx).type != DataType.LONG &&
fieldSchemaList.get(this.weightIdx).type != DataType.FLOAT &&
fieldSchemaList.get(this.weightIdx).type != DataType.DOUBLE)
{
String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
DataType.findTypeName(DataType.LONG),
DataType.findTypeName(DataType.FLOAT),
DataType.findTypeName(DataType.DOUBLE)};
throw new RuntimeException("Expect the type of the weight field of the input tuple to be of (" +
java.util.Arrays.toString(expectedTypes) + "), but instead found (" +
DataType.findTypeName(fieldSchemaList.get(this.weightIdx).type) + "), weight field: " +
this.weightIdx);
}
return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
inputFieldSchema.schema, DataType.BAG));
} catch (FrontendException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}