in streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java [89:140]
public StreamsDatum processLine(String line) {
List<String> expectedFields = fields;
if ( line.endsWith(lineDelimiter)) {
line = trimLineDelimiter(line);
}
String[] parsedFields = line.split(fieldDelimiter);
if (parsedFields.length == 0) {
return null;
}
String id = null;
DateTime ts = null;
BigInteger seq = null;
Map<String, Object> metadata = null;
String json = null;
if ( expectedFields.contains( FieldConstants.DOC )
&& parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
}
if ( expectedFields.contains( FieldConstants.ID )
&& parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
}
if ( expectedFields.contains( FieldConstants.SEQ )
&& parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
try {
seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
} catch ( NumberFormatException nfe ) {
LOGGER.warn("invalid sequence number {}", nfe);
}
}
if ( expectedFields.contains( FieldConstants.TS )
&& parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
}
if ( expectedFields.contains( FieldConstants.META )
&& parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
}
StreamsDatum datum = new StreamsDatum(json);
datum.setId(id);
datum.setTimestamp(ts);
datum.setMetadata(metadata);
datum.setSequenceid(seq);
return datum;
}