public Symbol doAction()

in ingestion-beam/src/main/java/com/mozilla/telemetry/avro/GuidedJsonDecoder.java [159:241]


  public Symbol doAction(Symbol input, Symbol top) throws IOException {
    if (top == Symbol.RECORD_START) {
      // Prepare for a new level of nesting by pushing onto the stack. The top
      // of the stack is the current depth in the document that the JsonParser
      // currently points to.
      assertCurrentToken(JsonToken.START_OBJECT, "record-start");
      recordStack.push(new Context());
      in.nextToken();
    } else if (top instanceof Symbol.FieldAdjustAction) {
      // Action the starts the process of reading data into a field as specified
      // by the DatumReader.
      Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
      Context ctx = recordStack.peek();

      // The decoder may have already parsed the field value of interest. It
      // performs a context switch and replays the cached content.
      TokenBuffer buffer = ctx.record.get(fa.fname);
      if (buffer != null) {
        ctx.record.remove(fa.fname);
        ctx.jp = in;
        in = buffer.asParser();
        in.nextToken();
        return null;
      }
      // The parser continues to stream the document, caching the raw tokens into
      // replayable buffers.
      if (in.getCurrentToken() == JsonToken.FIELD_NAME) {
        do {
          String name = renameField(in.getValueAsString());
          in.nextToken();
          if (fa.fname.equals(name)) {
            return null;
          }
          // Make a copy of the current JSON structure, which moves the current
          // parser to the last event that was copied. Then increment to the
          // next FIELD_NAME.
          buffer = new TokenBuffer(in);
          buffer.copyCurrentStructure(in);
          ctx.record.put(name, buffer);
          in.nextToken();
        } while (in.getCurrentToken() == JsonToken.FIELD_NAME);
      }
      if (parser.topSymbol() == Symbol.UNION) {
        // Insert the default value for a missing, nullable type. The symbol
        // set for grammars does not include the default value from the
        // schema, so we are not able to insert anything other than null.
        // However, schemas generated by the schema-transpiler will use unions
        // to specify whether a field is nullable (not required). This means
        // that null values can be inserted when missing.
        buffer = new TokenBuffer(in);
        buffer.writeNull();
        buffer.close();

        // Swap the current parser and initialize the current token.
        ctx.jp = in;
        in = buffer.asParser();
        in.nextToken();

        return null;
      }
      throw new AvroTypeException("Expected field name not found: " + fa.fname);
    } else if (top == Symbol.FIELD_END) {
      // The decoder has reached the end of a field. We switch back into the
      // original parser and continue.
      Context ctx = recordStack.peek();
      if (ctx.jp != null) {
        in = ctx.jp;
        ctx.jp = null;
      }
    } else if (top == Symbol.RECORD_END) {
      // The decoder has read all of the fields that are specified in the
      // schema. It skips any remaining fields in the parser and then returns to
      // the previous level of nesting.
      while (in.getCurrentToken() != JsonToken.END_OBJECT) {
        in.nextToken();
      }
      recordStack.pop();
      in.nextToken();
    } else {
      throw new AvroTypeException("Unknown action symbol " + top);
    }
    return null;
  }