private static void parse()

in druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java [119:297]


  private static void parse(QueryType queryType, InputStream in, Sink sink,
      List<String> fieldNames, List<ColumnMetaData.Rep> fieldTypes, Page page) {
    final JsonFactory factory = new JsonFactory();
    final Row.RowBuilder rowBuilder = Row.newBuilder(fieldNames.size());

    if (CalciteSystemProperty.DEBUG.value()) {
      try {
        final byte[] bytes = AvaticaUtils.readFullyToBytes(in);
        System.out.println("Response: "
            + new String(bytes, StandardCharsets.UTF_8)); // CHECKSTYLE: IGNORE 0
        in = new ByteArrayInputStream(bytes);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }

    int posTimestampField = -1;
    for (int i = 0; i < fieldTypes.size(); i++) {
      /*@TODO This need to be revisited. The logic seems implying that only
      one column of type timestamp is present, this is not necessarily true,
      see https://issues.apache.org/jira/browse/CALCITE-2175
      */
      if (fieldTypes.get(i) == ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP) {
        posTimestampField = i;
        break;
      }
    }

    try (JsonParser parser = factory.createParser(in)) {
      switch (queryType) {
      case TIMESERIES:
        if (parser.nextToken() == JsonToken.START_ARRAY) {
          while (parser.nextToken() == JsonToken.START_OBJECT) {
           // loop until token equal to "}"
            final Long timeValue = extractTimestampField(parser);
            if (parser.nextToken() == JsonToken.FIELD_NAME
                    && parser.getCurrentName().equals("result")
                    && parser.nextToken() == JsonToken.START_OBJECT) {
              if (posTimestampField != -1) {
                rowBuilder.set(posTimestampField, timeValue);
              }
              parseFields(fieldNames, fieldTypes, rowBuilder, parser);
              sink.send(rowBuilder.build());
              rowBuilder.reset();
            }
            expect(parser, JsonToken.END_OBJECT);
          }
        }
        break;

      case TOP_N:
        if (parser.nextToken() == JsonToken.START_ARRAY
            && parser.nextToken() == JsonToken.START_OBJECT) {
          final Long timeValue = extractTimestampField(parser);
          if (parser.nextToken() == JsonToken.FIELD_NAME
              && parser.getCurrentName().equals("result")
              && parser.nextToken() == JsonToken.START_ARRAY) {
            while (parser.nextToken() == JsonToken.START_OBJECT) {
              // loop until token equal to "}"
              if (posTimestampField != -1) {
                rowBuilder.set(posTimestampField, timeValue);
              }
              parseFields(fieldNames, fieldTypes, rowBuilder, parser);
              sink.send(rowBuilder.build());
              rowBuilder.reset();
            }
          }
        }
        break;

      case SELECT:
        if (parser.nextToken() == JsonToken.START_ARRAY
            && parser.nextToken() == JsonToken.START_OBJECT) {
          page.pagingIdentifier = null;
          page.offset = -1;
          page.totalRowCount = 0;
          expectScalarField(parser, DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
          if (parser.nextToken() == JsonToken.FIELD_NAME
              && parser.getCurrentName().equals("result")
              && parser.nextToken() == JsonToken.START_OBJECT) {
            while (parser.nextToken() == JsonToken.FIELD_NAME) {
              if (parser.getCurrentName().equals("pagingIdentifiers")
                  && parser.nextToken() == JsonToken.START_OBJECT) {
                JsonToken token = parser.nextToken();
                while (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
                  page.pagingIdentifier = parser.getCurrentName();
                  if (parser.nextToken() == JsonToken.VALUE_NUMBER_INT) {
                    page.offset = parser.getIntValue();
                  }
                  token = parser.nextToken();
                }
                expect(token, JsonToken.END_OBJECT);
              } else if (parser.getCurrentName().equals("events")
                  && parser.nextToken() == JsonToken.START_ARRAY) {
                while (parser.nextToken() == JsonToken.START_OBJECT) {
                  expectScalarField(parser, "segmentId");
                  expectScalarField(parser, "offset");
                  if (parser.nextToken() == JsonToken.FIELD_NAME
                      && parser.getCurrentName().equals("event")
                      && parser.nextToken() == JsonToken.START_OBJECT) {
                    parseFields(fieldNames, fieldTypes, posTimestampField, rowBuilder, parser);
                    sink.send(rowBuilder.build());
                    rowBuilder.reset();
                    page.totalRowCount += 1;
                  }
                  expect(parser, JsonToken.END_OBJECT);
                }
                parser.nextToken();
              } else if (parser.getCurrentName().equals("dimensions")
                  || parser.getCurrentName().equals("metrics")) {
                expect(parser, JsonToken.START_ARRAY);
                while (parser.nextToken() != JsonToken.END_ARRAY) {
                  // empty
                }
              }
            }
          }
        }
        break;

      case GROUP_BY:
        if (parser.nextToken() == JsonToken.START_ARRAY) {
          while (parser.nextToken() == JsonToken.START_OBJECT) {
            expectScalarField(parser, "version");
            final Long timeValue = extractTimestampField(parser);
            if (parser.nextToken() == JsonToken.FIELD_NAME
                && parser.getCurrentName().equals("event")
                && parser.nextToken() == JsonToken.START_OBJECT) {
              if (posTimestampField != -1) {
                rowBuilder.set(posTimestampField, timeValue);
              }
              parseFields(fieldNames, fieldTypes, posTimestampField, rowBuilder, parser);
              sink.send(rowBuilder.build());
              rowBuilder.reset();
            }
            expect(parser, JsonToken.END_OBJECT);
          }
        }
        break;

      case SCAN:
        if (parser.nextToken() == JsonToken.START_ARRAY) {
          while (parser.nextToken() == JsonToken.START_OBJECT) {
            expectScalarField(parser, "segmentId");

            expect(parser, JsonToken.FIELD_NAME);
            if (parser.getCurrentName().equals("columns")) {
              expect(parser, JsonToken.START_ARRAY);
              while (parser.nextToken() != JsonToken.END_ARRAY) {
                // Skip the columns list
              }
            }
            if (parser.nextToken() == JsonToken.FIELD_NAME
                && parser.getCurrentName().equals("events")
                && parser.nextToken() == JsonToken.START_ARRAY) {
              // Events is Array of Arrays where each array is a row
              while (parser.nextToken() == JsonToken.START_ARRAY) {
                for (String field : fieldNames) {
                  parseFieldForName(fieldNames, fieldTypes, posTimestampField, rowBuilder, parser,
                      field);
                }
                expect(parser, JsonToken.END_ARRAY);
                Row row = rowBuilder.build();
                sink.send(row);
                rowBuilder.reset();
                page.totalRowCount += 1;
              }
            }
            expect(parser, JsonToken.END_OBJECT);
          }
        }
        break;
      default:
        break;
      }
    } catch (IOException | InterruptedException e) {
      throw new RuntimeException(e);
    }
  }