public Observable call()

in mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage.java [248:329]


        public Observable<MantisDataFile> call(Observable<MantisRecord> source) {
            Observable<MantisRecord> timer = Observable.interval(
                    config.getWriterFlushFrequencyMsec(), TimeUnit.MILLISECONDS, timerScheduler)
                .map(i -> TIMEOUT_RECORD);

            return source.mergeWith(timer)
                .observeOn(transformerScheduler)
                .scan(new Trigger(config.getWriterRowGroupSize()), (trigger, record) -> {
                    if (record.getRecord().struct().fields().equals(TIMEOUT_SCHEMA.columns())) {
                        // Timeout; track all writers even if they're not yet ready to be flushed.
                        trigger.trackAll(writerPool.getWriters());
                    } else {
                        StructLike partition = partitioner.partition(record.getRecord());

                        if (writerPool.isClosed(partition)) {
                            try {
                                logger.info("opening file for partition {}", partition);
                                writerPool.open(partition);
                                metrics.increment(WriterMetrics.OPEN_SUCCESS_COUNT);
                            } catch (IOException e) {
                                metrics.increment(WriterMetrics.OPEN_FAILURE_COUNT);
                                throw Exceptions.propagate(e);
                            }
                        }

                        try {
                            writerPool.write(partition, record);
                            trigger.increment();
                            // Check all writers to see if any are flushable and track them if so.
                            // We should check _all_ writers because a writer that should be flushable
                            // may not be flushed if an event for its partition doesn't show up for
                            // a period of time. This could cause a longer delay for that partition,
                            // especially since we only check at the trigger's count threshold.
                            if (trigger.isOverCountThreshold()) {
                                trigger.trackAll(writerPool.getFlushableWriters());
                            }
                            metrics.increment(WriterMetrics.WRITE_SUCCESS_COUNT);
                        } catch (RuntimeException e) {
                            metrics.increment(WriterMetrics.WRITE_FAILURE_COUNT);
                            logger.debug("error writing record {}", record);
                        }
                    }

                    return trigger;
                })
                .filter(Trigger::shouldFlush)
                .map(trigger -> {
                    List<MantisDataFile> dataFiles = new ArrayList<>();

                    // Timer can still tick while no writers are open (i.e., no events), which means
                    // there won't be any tracked writers.
                    for (StructLike partition : trigger.getTrackedWriters()) {
                        try {
                            MantisDataFile dataFile = writerPool.close(partition);
                            dataFiles.add(dataFile);
                        } catch (IOException | RuntimeException e) {
                            metrics.increment(WriterMetrics.BATCH_FAILURE_COUNT);
                            logger.error("error writing DataFile", e);
                        }
                    }
                    trigger.reset();

                    return dataFiles;
                })
                .filter(dataFiles -> !dataFiles.isEmpty())
                .flatMapIterable(t -> t)
                .doOnNext(dataFile -> {
                    metrics.increment(WriterMetrics.BATCH_SUCCESS_COUNT);
                    logger.info("writing DataFile: {}", dataFile);
                    metrics.setGauge(WriterMetrics.BATCH_SIZE, dataFile.getDataFile().recordCount());
                    metrics.setGauge(WriterMetrics.BATCH_SIZE_BYTES, dataFile.getDataFile().fileSizeInBytes());
                })
                .doOnTerminate(() -> {
                    try {
                        logger.info("closing writer on rx terminate signal");
                        writerPool.closeAll();
                    } catch (IOException e) {
                        throw Exceptions.propagate(e);
                    }
                })
                .share();
        }