in streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/tsfile/TsFileSink.java [164:266]
public void onEvent(Event event) {
if (event == null) {
log.info("Received null event");
return;
}
final AbstractField timestampAbstractField = event.getFieldBySelector(timestampFieldId);
final Long timestamp = timestampAbstractField.getAsPrimitive().getAsLong();
if (timestamp == null) {
log.info("Received event with null timestamp");
return;
}
final Map<String, Object> measurementValuePairs = event.getRaw();
// should be at least a timestamp field and a measurement field
if (measurementValuePairs.size() <= 1) {
log.info("Received event with insufficient measurement value pairs");
return;
}
Tablet tablet = new Tablet(deviceId, schemas, 1);
/*
We need to know the size of the file to determine the timing of flashing data to disk and
creating a new file when the file is too large.
However, newTsFile. length() cannot return the actual file size,
so we use size to estimate the file size. For example,
if we write a Boolean type data that occupies 8 bytes, we add 8 to achieve this goal
*/
int size = 0;
tablet.timestamps[0] = timestamp;
for (int i = 0; i < schemas.size(); i++) {
MeasurementSchema schema = tablet.getSchemas().get(i);
AbstractField fieldByRuntimeName = event.getFieldByRuntimeName(schema.getMeasurementId());
if (fieldByRuntimeName == null){
tablet.bitMaps[i].mark(0);
continue;
}
switch (schema.getType()){
case BOOLEAN:
size += BOOLEAN_SIZE;
((boolean[]) tablet.values[i])[0] = fieldByRuntimeName.getAsPrimitive().getAsBoolean();
break;
case INT32:
size += INIEGER_SIZE;
((int[]) tablet.values[i])[0] = fieldByRuntimeName.getAsPrimitive().getAsInt();
break;
case INT64:
size += LONG_SIZE;
((long[]) tablet.values[i])[0] = fieldByRuntimeName.getAsPrimitive().getAsLong();
break;
case FLOAT:
size += FLOAT_SIZE;
((float[]) tablet.values[i])[0] = fieldByRuntimeName.getAsPrimitive().getAsFloat();
break;
case DOUBLE:
size += DOUBLE_SIZE;
((double[]) tablet.values[i])[0] = fieldByRuntimeName.getAsPrimitive().getAsDouble();
break;
case STRING:
String sValue = fieldByRuntimeName.getAsPrimitive().getAsString();
size += sValue.length();
((String[]) tablet.values[i])[0] = sValue;
break;
default:
throw new UnsupportedOperationException("Unsupported data type: " + schema.getType());
}
}
try {
if (maxTime > timestamp) {
log.info("The file size did not reach the expected size, "
+ "but due to the time taken to write the measurement point being less than the previous writing time,"
+ " the file needs to be closed in advance");
resetTsFileWriter();
maxTime = Long.MIN_VALUE;
}
try {
if (aligned){
tsFileWriter.writeAligned(tablet);
} else {
tsFileWriter.write(tablet);
}
totalWriteSize += size;
writeSize += size;
maxTime = timestamp;
} catch (WriteProcessException | IOException e) {
resetTsFileWriter();
throw new SpRuntimeException("Failed to write TSRecord", e);
}
if (totalWriteSize >= maxTsFileSize) {
resetTsFileWriter();
return;
}
if (writeSize >= maxFlushDiskSize) {
tsFileWriter.flushAllChunkGroups();
writeSize = 0;
}
} catch (IOException e) {
throw new SpRuntimeException("Failed to resetTsFileWriter" , e);
}
}