in flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java [50:79]
public static DataPoint parseToDataPoint(final String input) throws ParseException {
final CharStream charStream = new ANTLRInputStream(input);
final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
final TokenStream tokenStream = new CommonTokenStream(lexer);
final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
if (parser.getNumberOfSyntaxErrors() != 0) {
throw new ParseException("Unable to parse line.", 0);
}
if (lines.size() != 1) {
throw new ParseException(
"Multiple lines present; unable to parse more than one per record.", 0);
}
final InfluxLineProtocolParser.LineContext line = lines.get(0);
final String measurement = parseIdentifier(line.identifier());
final Long timestamp = parseTimestamp(line.timestamp());
final DataPoint out = new DataPoint(measurement, timestamp);
if (line.tag_set() != null) {
line.tag_set().tag_pair().forEach(t -> parseTag(t, out));
}
line.field_set().field_pair().forEach(t -> parseField(t, out));
return out;
}