in ingestion-beam/src/main/java/com/mozilla/telemetry/avro/GuidedJsonDecoder.java [159:241]
public Symbol doAction(Symbol input, Symbol top) throws IOException {
if (top == Symbol.RECORD_START) {
// Prepare for a new level of nesting by pushing onto the stack. The top
// of the stack is the current depth in the document that the JsonParser
// currently points to.
assertCurrentToken(JsonToken.START_OBJECT, "record-start");
recordStack.push(new Context());
in.nextToken();
} else if (top instanceof Symbol.FieldAdjustAction) {
// Action the starts the process of reading data into a field as specified
// by the DatumReader.
Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
Context ctx = recordStack.peek();
// The decoder may have already parsed the field value of interest. It
// performs a context switch and replays the cached content.
TokenBuffer buffer = ctx.record.get(fa.fname);
if (buffer != null) {
ctx.record.remove(fa.fname);
ctx.jp = in;
in = buffer.asParser();
in.nextToken();
return null;
}
// The parser continues to stream the document, caching the raw tokens into
// replayable buffers.
if (in.getCurrentToken() == JsonToken.FIELD_NAME) {
do {
String name = renameField(in.getValueAsString());
in.nextToken();
if (fa.fname.equals(name)) {
return null;
}
// Make a copy of the current JSON structure, which moves the current
// parser to the last event that was copied. Then increment to the
// next FIELD_NAME.
buffer = new TokenBuffer(in);
buffer.copyCurrentStructure(in);
ctx.record.put(name, buffer);
in.nextToken();
} while (in.getCurrentToken() == JsonToken.FIELD_NAME);
}
if (parser.topSymbol() == Symbol.UNION) {
// Insert the default value for a missing, nullable type. The symbol
// set for grammars does not include the default value from the
// schema, so we are not able to insert anything other than null.
// However, schemas generated by the schema-transpiler will use unions
// to specify whether a field is nullable (not required). This means
// that null values can be inserted when missing.
buffer = new TokenBuffer(in);
buffer.writeNull();
buffer.close();
// Swap the current parser and initialize the current token.
ctx.jp = in;
in = buffer.asParser();
in.nextToken();
return null;
}
throw new AvroTypeException("Expected field name not found: " + fa.fname);
} else if (top == Symbol.FIELD_END) {
// The decoder has reached the end of a field. We switch back into the
// original parser and continue.
Context ctx = recordStack.peek();
if (ctx.jp != null) {
in = ctx.jp;
ctx.jp = null;
}
} else if (top == Symbol.RECORD_END) {
// The decoder has read all of the fields that are specified in the
// schema. It skips any remaining fields in the parser and then returns to
// the previous level of nesting.
while (in.getCurrentToken() != JsonToken.END_OBJECT) {
in.nextToken();
}
recordStack.pop();
in.nextToken();
} else {
throw new AvroTypeException("Unknown action symbol " + top);
}
return null;
}