in mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage.java [248:329]
public Observable<MantisDataFile> call(Observable<MantisRecord> source) {
Observable<MantisRecord> timer = Observable.interval(
config.getWriterFlushFrequencyMsec(), TimeUnit.MILLISECONDS, timerScheduler)
.map(i -> TIMEOUT_RECORD);
return source.mergeWith(timer)
.observeOn(transformerScheduler)
.scan(new Trigger(config.getWriterRowGroupSize()), (trigger, record) -> {
if (record.getRecord().struct().fields().equals(TIMEOUT_SCHEMA.columns())) {
// Timeout; track all writers even if they're not yet ready to be flushed.
trigger.trackAll(writerPool.getWriters());
} else {
StructLike partition = partitioner.partition(record.getRecord());
if (writerPool.isClosed(partition)) {
try {
logger.info("opening file for partition {}", partition);
writerPool.open(partition);
metrics.increment(WriterMetrics.OPEN_SUCCESS_COUNT);
} catch (IOException e) {
metrics.increment(WriterMetrics.OPEN_FAILURE_COUNT);
throw Exceptions.propagate(e);
}
}
try {
writerPool.write(partition, record);
trigger.increment();
// Check all writers to see if any are flushable and track them if so.
// We should check _all_ writers because a writer that should be flushable
// may not be flushed if an event for its partition doesn't show up for
// a period of time. This could cause a longer delay for that partition,
// especially since we only check at the trigger's count threshold.
if (trigger.isOverCountThreshold()) {
trigger.trackAll(writerPool.getFlushableWriters());
}
metrics.increment(WriterMetrics.WRITE_SUCCESS_COUNT);
} catch (RuntimeException e) {
metrics.increment(WriterMetrics.WRITE_FAILURE_COUNT);
logger.debug("error writing record {}", record);
}
}
return trigger;
})
.filter(Trigger::shouldFlush)
.map(trigger -> {
List<MantisDataFile> dataFiles = new ArrayList<>();
// Timer can still tick while no writers are open (i.e., no events), which means
// there won't be any tracked writers.
for (StructLike partition : trigger.getTrackedWriters()) {
try {
MantisDataFile dataFile = writerPool.close(partition);
dataFiles.add(dataFile);
} catch (IOException | RuntimeException e) {
metrics.increment(WriterMetrics.BATCH_FAILURE_COUNT);
logger.error("error writing DataFile", e);
}
}
trigger.reset();
return dataFiles;
})
.filter(dataFiles -> !dataFiles.isEmpty())
.flatMapIterable(t -> t)
.doOnNext(dataFile -> {
metrics.increment(WriterMetrics.BATCH_SUCCESS_COUNT);
logger.info("writing DataFile: {}", dataFile);
metrics.setGauge(WriterMetrics.BATCH_SIZE, dataFile.getDataFile().recordCount());
metrics.setGauge(WriterMetrics.BATCH_SIZE_BYTES, dataFile.getDataFile().fileSizeInBytes());
})
.doOnTerminate(() -> {
try {
logger.info("closing writer on rx terminate signal");
writerPool.closeAll();
} catch (IOException e) {
throw Exceptions.propagate(e);
}
})
.share();
}