in src/org/apache/pig/newplan/logical/relational/LOGenerate.java [62:214]
public LogicalSchema getSchema() throws FrontendException {
if (schema != null) {
return schema;
}
if (uidOnlySchemas == null) {
uidOnlySchemas = new ArrayList<LogicalSchema>();
for (int i=0;i<outputPlans.size();i++) {
uidOnlySchemas.add(null);
}
}
schema = new LogicalSchema();
outputPlanSchemas = new ArrayList<LogicalSchema>();
expSchemas = new ArrayList<LogicalSchema>();
flattenNumFields = new int[outputPlans.size()];
for(int i=0; i<outputPlans.size(); i++) {
flattenNumFields[i] = 0;
LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
LogicalSchema mUserDefinedSchemaCopy = null;
if (mUserDefinedSchema!=null && mUserDefinedSchema.get(i)!=null) {
mUserDefinedSchemaCopy = new LogicalSchema();
for (LogicalSchema.LogicalFieldSchema fs : mUserDefinedSchema.get(i).getFields()) {
mUserDefinedSchemaCopy.addField(fs.deepCopy());
}
}
LogicalFieldSchema fieldSchema = null;
// schema of the expression after flatten
LogicalSchema expSchema = null;
if (exp.getFieldSchema()!=null) {
fieldSchema = exp.getFieldSchema().deepCopy();
expSchema = new LogicalSchema();
if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) {
// if type is primitive, just add to schema
if (fieldSchema != null)
expSchema.addField(fieldSchema);
} else {
// if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator
if (fieldSchema.schema==null) {
expSchema = null;
}
else {
// if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema
List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
if (flattenFlags[i]) {
if (fieldSchema.type == DataType.BAG) {
// if it is bag, get the schema of tuples
if (fieldSchema.schema!=null) {
if (fieldSchema.schema.getField(0).schema!=null) {
innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
flattenNumFields[i] = innerFieldSchemas.size();
}
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
}
}
} else if (fieldSchema.type == DataType.MAP) {
//should only contain 1 schemafield for Map's value
innerFieldSchemas = fieldSchema.schema.getFields();
flattenNumFields[i] = 2; // used for FLATTEN(null-map)
LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
fsForValue.alias = fieldSchema.alias + "::value";
LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema(
fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid);
expSchema.addField(fsForKey);
} else { // DataType.TUPLE
innerFieldSchemas = fieldSchema.schema.getFields();
flattenNumFields[i] = innerFieldSchemas.size();
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
}
}
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
expSchema.addField(fs);
}
else
expSchema.addField(fieldSchema);
}
}
}
// Merge with user defined schema
if (expSchema!=null && expSchema.size()==0)
expSchema = null;
LogicalSchema planSchema = new LogicalSchema();
expSchemas.add(expSchema);
if (mUserDefinedSchemaCopy!=null) {
LogicalSchema mergedSchema = new LogicalSchema();
// merge with userDefinedSchema
if (expSchema==null) {
// Use user defined schema
for (LogicalFieldSchema fs : mUserDefinedSchemaCopy.getFields()) {
fs.stampFieldSchema();
mergedSchema.addField(new LogicalFieldSchema(fs));
}
} else {
// Merge uid with the exp field schema
mergedSchema = LogicalSchema.merge(mUserDefinedSchemaCopy, expSchema, LogicalSchema.MergeMode.LoadForEach);
if (mergedSchema==null) {
throw new FrontendException(this, "Cannot merge (" + expSchema.toString(false) +
") with user defined schema (" + mUserDefinedSchemaCopy.toString(false) + ")", 1117);
}
mergedSchema.mergeUid(expSchema);
}
setNullTypeToByteArrayType(mergedSchema);
for (LogicalFieldSchema fs : mergedSchema.getFields()) {
planSchema.addField(fs);
}
} else {
// if any plan do not have schema, the whole LOGenerate do not have schema
if (expSchema==null) {
planSchema = null;
}
else {
// Merge schema for the plan
for (LogicalFieldSchema fs : expSchema.getFields())
planSchema.addField(fs);
}
}
if (planSchema==null) {
schema = null;
break;
}
for (LogicalFieldSchema fs : planSchema.getFields())
schema.addField(fs);
// If the schema is generated by user defined schema, keep uid
if (expSchema==null) {
LogicalSchema uidOnlySchema = planSchema.mergeUid(uidOnlySchemas.get(i));
uidOnlySchemas.set(i, uidOnlySchema);
}
outputPlanSchemas.add(planSchema);
}
if (schema==null || schema.size()==0) {
schema = null;
outputPlanSchemas = null;
}
return schema;
}