public StreamsDatum processLine()

in streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java [89:140]


  public StreamsDatum processLine(String line) {

    List<String> expectedFields = fields;
    if ( line.endsWith(lineDelimiter)) {
      line = trimLineDelimiter(line);
    }
    String[] parsedFields = line.split(fieldDelimiter);

    if (parsedFields.length == 0) {
      return null;
    }

    String id = null;
    DateTime ts = null;
    BigInteger seq = null;
    Map<String, Object> metadata = null;
    String json = null;

    if ( expectedFields.contains( FieldConstants.DOC )
        && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
      json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
    }

    if ( expectedFields.contains( FieldConstants.ID )
        && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
      id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
    }
    if ( expectedFields.contains( FieldConstants.SEQ )
        && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
      try {
        seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
      } catch ( NumberFormatException nfe ) {
        LOGGER.warn("invalid sequence number {}", nfe);
      }
    }
    if ( expectedFields.contains( FieldConstants.TS )
        && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
      ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
    }
    if ( expectedFields.contains( FieldConstants.META )
        && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
      metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
    }

    StreamsDatum datum = new StreamsDatum(json);
    datum.setId(id);
    datum.setTimestamp(ts);
    datum.setMetadata(metadata);
    datum.setSequenceid(seq);
    return datum;

  }