in parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java [253:391]
public RecordReaderImplementation(
MessageColumnIO root,
RecordMaterializer<T> recordMaterializer,
boolean validating,
ColumnReadStoreImpl columnStore) {
this.recordMaterializer = recordMaterializer;
this.recordRootConverter =
recordMaterializer
.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[0]);
columnReaders = new ColumnReader[leaves.length];
int[][] nextColumnIdxForRepLevel = new int[leaves.length][];
int[][] levelToClose = new int[leaves.length][];
GroupConverter[][] groupConverterPaths = new GroupConverter[leaves.length][];
PrimitiveConverter[] leafConverters = new PrimitiveConverter[leaves.length];
int[] firstIndexForLevel = new int[256]; // "256 levels of nesting ought to be enough for anybody"
// build the automaton
for (int i = 0; i < leaves.length; i++) {
PrimitiveColumnIO leafColumnIO = leaves[i];
// generate converters along the path from root to leaf
final int[] indexFieldPath = leafColumnIO.getIndexFieldPath();
groupConverterPaths[i] = new GroupConverter[indexFieldPath.length - 1];
GroupConverter current = this.recordRootConverter;
for (int j = 0; j < indexFieldPath.length - 1; j++) {
current = current.getConverter(indexFieldPath[j]).asGroupConverter();
groupConverterPaths[i][j] = current;
}
leafConverters[i] = current.getConverter(indexFieldPath[indexFieldPath.length - 1])
.asPrimitiveConverter();
columnReaders[i] = columnStore.getColumnReader(leafColumnIO.getColumnDescriptor());
int maxRepetitionLevel = leafColumnIO.getRepetitionLevel();
nextColumnIdxForRepLevel[i] = new int[maxRepetitionLevel + 1];
levelToClose[i] = new int[maxRepetitionLevel + 1]; // next level
for (int nextRepLevel = 0; nextRepLevel <= maxRepetitionLevel; ++nextRepLevel) {
// remember which is the first for this level
if (leafColumnIO.isFirst(nextRepLevel)) {
firstIndexForLevel[nextRepLevel] = i;
}
int nextColIdx;
// TODO: when we use nextColumnIdxForRepLevel, should we provide current rep level or the rep level for
// next item
// figure out automaton transition
if (nextRepLevel == 0) { // 0 always means jump to the next (the last one being a special case)
nextColIdx = i + 1;
} else if (leafColumnIO.isLast(
nextRepLevel)) { // when we are at the last of the next repetition level we jump back to the
// first
nextColIdx = firstIndexForLevel[nextRepLevel];
} else { // otherwise we just go back to the next.
nextColIdx = i + 1;
}
// figure out which level down the tree we need to go back
if (nextColIdx == leaves.length) { // reached the end of the record => close all levels
levelToClose[i][nextRepLevel] = 0;
} else if (leafColumnIO.isLast(
nextRepLevel)) { // reached the end of this level => close the repetition level
ColumnIO parent = leafColumnIO.getParent(nextRepLevel);
levelToClose[i][nextRepLevel] = parent.getFieldPath().length - 1;
} else { // otherwise close until the next common parent
levelToClose[i][nextRepLevel] =
getCommonParentLevel(leafColumnIO.getFieldPath(), leaves[nextColIdx].getFieldPath());
}
// sanity check: that would be a bug
if (levelToClose[i][nextRepLevel] > leaves[i].getFieldPath().length - 1) {
throw new ParquetEncodingException(Arrays.toString(leaves[i].getFieldPath()) + " -(" + nextRepLevel
+ ")-> " + levelToClose[i][nextRepLevel]);
}
nextColumnIdxForRepLevel[i][nextRepLevel] = nextColIdx;
}
}
states = new State[leaves.length];
for (int i = 0; i < leaves.length; i++) {
states[i] = new State(
i, leaves[i], columnReaders[i], levelToClose[i], groupConverterPaths[i], leafConverters[i]);
int[] definitionLevelToDepth = new int[states[i].primitiveColumnIO.getDefinitionLevel() + 1];
// for each possible definition level, determine the depth at which to create groups
final ColumnIO[] path = states[i].primitiveColumnIO.getPath();
int depth = 0;
for (int d = 0; d < definitionLevelToDepth.length; ++d) {
while (depth < (states[i].fieldPath.length - 1) && d >= path[depth + 1].getDefinitionLevel()) {
++depth;
}
definitionLevelToDepth[d] = depth - 1;
}
states[i].definitionLevelToDepth = definitionLevelToDepth;
}
for (int i = 0; i < leaves.length; i++) {
State state = states[i];
int[] nextStateIds = nextColumnIdxForRepLevel[i];
state.nextState = new State[nextStateIds.length];
for (int j = 0; j < nextStateIds.length; j++) {
state.nextState[j] = nextStateIds[j] == states.length ? null : states[nextStateIds[j]];
}
}
for (int i = 0; i < states.length; i++) {
State state = states[i];
final Map<Case, Case> definedCases = new HashMap<>();
final Map<Case, Case> undefinedCases = new HashMap<>();
Case[][][] caseLookup = new Case[state.fieldPath.length][][];
for (int currentLevel = 0; currentLevel < state.fieldPath.length; ++currentLevel) {
caseLookup[currentLevel] = new Case[state.maxDefinitionLevel + 1][];
for (int d = 0; d <= state.maxDefinitionLevel; ++d) {
caseLookup[currentLevel][d] = new Case[state.maxRepetitionLevel + 1];
for (int nextR = 0; nextR <= state.maxRepetitionLevel; ++nextR) {
int caseStartLevel = currentLevel;
int caseDepth = Math.max(state.getDepth(d), caseStartLevel - 1);
int caseNextLevel = Math.min(state.nextLevel[nextR], caseDepth + 1);
Case currentCase = new Case(
caseStartLevel,
caseDepth,
caseNextLevel,
getNextReader(state.id, nextR),
d == state.maxDefinitionLevel);
Map<Case, Case> cases = currentCase.isDefined() ? definedCases : undefinedCases;
if (!cases.containsKey(currentCase)) {
currentCase.setID(cases.size());
cases.put(currentCase, currentCase);
} else {
currentCase = cases.get(currentCase);
}
caseLookup[currentLevel][d][nextR] = currentCase;
}
}
}
state.caseLookup = caseLookup;
state.definedCases = new ArrayList<>(definedCases.values());
state.undefinedCases = new ArrayList<>(undefinedCases.values());
Comparator<Case> caseComparator = new Comparator<Case>() {
@Override
public int compare(Case o1, Case o2) {
return o1.id - o2.id;
}
};
Collections.sort(state.definedCases, caseComparator);
Collections.sort(state.undefinedCases, caseComparator);
}
}