in parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java [105:162]
private void writeTuple(GroupType schema, Schema pigSchema, Tuple t) throws ExecException, FrontendException {
List<Type> fields = schema.getFields();
List<FieldSchema> pigFields = pigSchema.getFields();
assert fields.size() == pigFields.size();
for (int i = 0; i < fields.size(); i++) {
if (t.isNull(i)) {
continue;
}
Type fieldType = fields.get(i);
recordConsumer.startField(fieldType.getName(), i);
FieldSchema pigType = pigFields.get(i);
switch (pigType.type) {
case DataType.BAG:
Type bagType = fieldType.asGroupType().getType(0);
FieldSchema pigBagInnerType = pigType.schema.getField(0);
DataBag bag = (DataBag)t.get(i);
recordConsumer.startGroup();
if (bag.size() > 0) {
recordConsumer.startField(bagType.getName(), 0);
for (Tuple tuple : bag) {
if (bagType.isPrimitive()) {
writeValue(bagType, pigBagInnerType, tuple, 0);
} else {
recordConsumer.startGroup();
writeTuple(bagType.asGroupType(), pigBagInnerType.schema, tuple);
recordConsumer.endGroup();
}
}
recordConsumer.endField(bagType.getName(), 0);
}
recordConsumer.endGroup();
break;
case DataType.MAP:
Type mapType = fieldType.asGroupType().getType(0);
FieldSchema pigMapInnerType = pigType.schema.getField(0);
@SuppressWarnings("unchecked") // I know
Map<String, Object> map = (Map<String, Object>)t.get(i);
recordConsumer.startGroup();
if (map.size() > 0) {
recordConsumer.startField(mapType.getName(), 0);
Set<Entry<String, Object>> entrySet = map.entrySet();
for (Entry<String, Object> entry : entrySet) {
recordConsumer.startGroup();
Schema keyValueSchema = new Schema(Arrays.asList(new FieldSchema("key", DataType.CHARARRAY), new FieldSchema("value", pigMapInnerType.schema, pigMapInnerType.type)));
writeTuple(mapType.asGroupType(), keyValueSchema, TF.newTuple(Arrays.asList(entry.getKey(), entry.getValue())));
recordConsumer.endGroup();
}
recordConsumer.endField(mapType.getName(), 0);
}
recordConsumer.endGroup();
break;
default:
writeValue(fieldType, pigType, t, i);
break;
}
recordConsumer.endField(fieldType.getName(), i);
}
}