in druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java [119:297]
private static void parse(QueryType queryType, InputStream in, Sink sink,
List<String> fieldNames, List<ColumnMetaData.Rep> fieldTypes, Page page) {
final JsonFactory factory = new JsonFactory();
final Row.RowBuilder rowBuilder = Row.newBuilder(fieldNames.size());
if (CalciteSystemProperty.DEBUG.value()) {
try {
final byte[] bytes = AvaticaUtils.readFullyToBytes(in);
System.out.println("Response: "
+ new String(bytes, StandardCharsets.UTF_8)); // CHECKSTYLE: IGNORE 0
in = new ByteArrayInputStream(bytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
int posTimestampField = -1;
for (int i = 0; i < fieldTypes.size(); i++) {
/*@TODO This need to be revisited. The logic seems implying that only
one column of type timestamp is present, this is not necessarily true,
see https://issues.apache.org/jira/browse/CALCITE-2175
*/
if (fieldTypes.get(i) == ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP) {
posTimestampField = i;
break;
}
}
try (JsonParser parser = factory.createParser(in)) {
switch (queryType) {
case TIMESERIES:
if (parser.nextToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() == JsonToken.START_OBJECT) {
// loop until token equal to "}"
final Long timeValue = extractTimestampField(parser);
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("result")
&& parser.nextToken() == JsonToken.START_OBJECT) {
if (posTimestampField != -1) {
rowBuilder.set(posTimestampField, timeValue);
}
parseFields(fieldNames, fieldTypes, rowBuilder, parser);
sink.send(rowBuilder.build());
rowBuilder.reset();
}
expect(parser, JsonToken.END_OBJECT);
}
}
break;
case TOP_N:
if (parser.nextToken() == JsonToken.START_ARRAY
&& parser.nextToken() == JsonToken.START_OBJECT) {
final Long timeValue = extractTimestampField(parser);
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("result")
&& parser.nextToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() == JsonToken.START_OBJECT) {
// loop until token equal to "}"
if (posTimestampField != -1) {
rowBuilder.set(posTimestampField, timeValue);
}
parseFields(fieldNames, fieldTypes, rowBuilder, parser);
sink.send(rowBuilder.build());
rowBuilder.reset();
}
}
}
break;
case SELECT:
if (parser.nextToken() == JsonToken.START_ARRAY
&& parser.nextToken() == JsonToken.START_OBJECT) {
page.pagingIdentifier = null;
page.offset = -1;
page.totalRowCount = 0;
expectScalarField(parser, DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("result")
&& parser.nextToken() == JsonToken.START_OBJECT) {
while (parser.nextToken() == JsonToken.FIELD_NAME) {
if (parser.getCurrentName().equals("pagingIdentifiers")
&& parser.nextToken() == JsonToken.START_OBJECT) {
JsonToken token = parser.nextToken();
while (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
page.pagingIdentifier = parser.getCurrentName();
if (parser.nextToken() == JsonToken.VALUE_NUMBER_INT) {
page.offset = parser.getIntValue();
}
token = parser.nextToken();
}
expect(token, JsonToken.END_OBJECT);
} else if (parser.getCurrentName().equals("events")
&& parser.nextToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() == JsonToken.START_OBJECT) {
expectScalarField(parser, "segmentId");
expectScalarField(parser, "offset");
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("event")
&& parser.nextToken() == JsonToken.START_OBJECT) {
parseFields(fieldNames, fieldTypes, posTimestampField, rowBuilder, parser);
sink.send(rowBuilder.build());
rowBuilder.reset();
page.totalRowCount += 1;
}
expect(parser, JsonToken.END_OBJECT);
}
parser.nextToken();
} else if (parser.getCurrentName().equals("dimensions")
|| parser.getCurrentName().equals("metrics")) {
expect(parser, JsonToken.START_ARRAY);
while (parser.nextToken() != JsonToken.END_ARRAY) {
// empty
}
}
}
}
}
break;
case GROUP_BY:
if (parser.nextToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() == JsonToken.START_OBJECT) {
expectScalarField(parser, "version");
final Long timeValue = extractTimestampField(parser);
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("event")
&& parser.nextToken() == JsonToken.START_OBJECT) {
if (posTimestampField != -1) {
rowBuilder.set(posTimestampField, timeValue);
}
parseFields(fieldNames, fieldTypes, posTimestampField, rowBuilder, parser);
sink.send(rowBuilder.build());
rowBuilder.reset();
}
expect(parser, JsonToken.END_OBJECT);
}
}
break;
case SCAN:
if (parser.nextToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() == JsonToken.START_OBJECT) {
expectScalarField(parser, "segmentId");
expect(parser, JsonToken.FIELD_NAME);
if (parser.getCurrentName().equals("columns")) {
expect(parser, JsonToken.START_ARRAY);
while (parser.nextToken() != JsonToken.END_ARRAY) {
// Skip the columns list
}
}
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("events")
&& parser.nextToken() == JsonToken.START_ARRAY) {
// Events is Array of Arrays where each array is a row
while (parser.nextToken() == JsonToken.START_ARRAY) {
for (String field : fieldNames) {
parseFieldForName(fieldNames, fieldTypes, posTimestampField, rowBuilder, parser,
field);
}
expect(parser, JsonToken.END_ARRAY);
Row row = rowBuilder.build();
sink.send(row);
rowBuilder.reset();
page.totalRowCount += 1;
}
}
expect(parser, JsonToken.END_OBJECT);
}
}
break;
default:
break;
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}