public void onEvent()

in streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java [124:181]


  public void onEvent(Event event) throws SpRuntimeException {
    if (event == null) {
      return;
    }

    final AbstractField timestampAbstractField = event.getFieldBySelector(timestampFieldId);
    final Long timestamp = timestampAbstractField.getAsPrimitive().getAsLong();
    if (timestamp == null) {
      return;
    }

    final Map<String, Object> measurementValuePairs = event.getRaw();
    // should be at least a timestamp field and a measurement field
    if (measurementValuePairs.size() <= 1) {
      return;
    }

    final int measurementFieldCount = measurementValuePairs.size() - 1;
    final List<String> measurements = new ArrayList<>(measurementFieldCount);
    final List<TSDataType> types = new ArrayList<>(measurementFieldCount);
    final List<Object> values = new ArrayList<>(measurementFieldCount);

    for (Map.Entry<String, Object> measurementValuePair : measurementValuePairs.entrySet()) {
      if (timestampAbstractField.getFieldNameIn().equals(measurementValuePair.getKey())) {
        continue;
      }

      measurements.add(measurementValuePair.getKey());

      final Object value = measurementValuePair.getValue();
      if (value instanceof Integer) {
        types.add(TSDataType.INT32);
        values.add(value);
      } else if (value instanceof Long) {
        types.add(TSDataType.INT64);
        values.add(value);
      } else if (value instanceof Float) {
        types.add(TSDataType.FLOAT);
        values.add(value);
      } else if (value instanceof Double) {
        types.add(TSDataType.DOUBLE);
        values.add(value);
      } else if (value instanceof Boolean) {
        types.add(TSDataType.BOOLEAN);
        values.add(value);
      } else {
        types.add(TSDataType.TEXT);
        values.add(value);
      }
    }

    try {
      sessionPool.insertRecord(deviceId, timestamp, measurements, types, values);
    } catch (IoTDBConnectionException | StatementExecutionException e) {
      LOG.error("Failed to save event to IoTDB, because: " + e.getMessage());
      e.printStackTrace();
    }
  }