public void processElement()

in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java [200:336]


    public void processElement(RecordWithSchema recordWithSchema) throws Exception {
        TableIdentifier tableId = recordWithSchema.getTableId();
        if (recordWithSchema.isDDL()) {
            // just record node metrics for ddl
            if (sinkMetricData != null) {
                sinkMetricData.outputMetricsWithEstimate(1);
            }
            return;
        }
        if (isSchemaUpdate(recordWithSchema)) {
            if (multipleTables.get(tableId) == null) {
                Table table = catalog.loadTable(recordWithSchema.getTableId());
                multipleTables.put(tableId, table);
            }

            // refresh some runtime table properties
            Table table = multipleTables.get(recordWithSchema.getTableId());
            Map<String, String> tableProperties = table.properties();
            boolean upsertMode = PropertyUtil.propertyAsBoolean(tableProperties,
                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
            long targetFileSizeBytes = PropertyUtil.propertyAsLong(tableProperties,
                    WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
            String formatString = tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
            FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
            List<Integer> equalityFieldIds = recordWithSchema.getPrimaryKeys().stream()
                    .map(pk -> recordWithSchema.getSchema().findField(pk).fieldId())
                    .collect(Collectors.toList());
            // if physical primary key not exist, put all field to logical primary key
            if (equalityFieldIds.isEmpty() && multipleSinkOption.isPkAutoGenerated()) {
                equalityFieldIds = recordWithSchema.getSchema().columns().stream()
                        .map(NestedField::fieldId)
                        .collect(Collectors.toList());
            }
            RowType flinkRowType = FlinkSchemaUtil.convert(recordWithSchema.getSchema());
            RowDataTaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(
                    table,
                    recordWithSchema.getSchema(),
                    flinkRowType,
                    targetFileSizeBytes,
                    fileFormat,
                    equalityFieldIds,
                    upsertMode,
                    appendMode,
                    false);

            if (multipleWriters.get(tableId) == null) {
                String subWriterInlongMetric = inlongMetric + DELIMITER
                        + Constants.DATABASE_NAME + "=" + tableId.namespace().toString()
                        + DELIMITER
                        + Constants.TABLE_NAME + "=" + tableId.name();
                IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
                        tableId.toString(), taskWriterFactory, subWriterInlongMetric,
                        auditHostAndPorts, flinkRowType, dirtyOptions, dirtySink, true,
                        tableSchemaRowType, metaFieldIndex, switchAppendUpsertEnable, auditKeys);
                writer.setup(getRuntimeContext(),
                        new CallbackCollector<>(
                                writeResult -> collector.collect(new MultipleWriteResult(tableId, writeResult))),
                        context);
                writer.initializeState(functionInitializationContext);
                writer.open(new Configuration());
                multipleWriters.put(tableId, writer);
            } else { // only if second times schema will evolute
                // Refresh new schema maybe cause previous file writer interrupted, so here should handle it
                multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
                multipleWriters.get(tableId).setFlinkRowType(flinkRowType);
            }

        }

        if (multipleWriters.get(tableId) != null) {
            if (recordWithSchema.isDirty()) {
                String dataBaseName = tableId.namespace().toString();
                String tableName = tableId.name();
                if (sinkMetricData != null) {
                    sinkMetricData.outputDirtyMetrics(dataBaseName,
                            tableName, recordWithSchema.getRowCount(), recordWithSchema.getRowSize());
                }
            } else {
                for (RowData data : recordWithSchema.getData()) {
                    String dataBaseName = tableId.namespace().toString();
                    String tableName = tableId.name();
                    long size = CalculateObjectSizeUtils.getDataSize(data);

                    try {
                        if (switchAppendUpsertEnable && recordWithSchema.isIncremental()) {
                            multipleWriters.get(tableId).switchToUpsert();
                        }
                        multipleWriters.get(tableId).processElement(data);
                    } catch (Exception e) {
                        LOG.error(String.format("write error, raw data: %s", data), e);
                        if (!dirtyOptions.ignoreDirty()) {
                            throw e;
                        }
                        if (dirtySink != null) {
                            DirtyData.Builder<Object> builder = DirtyData.builder();
                            try {
                                String dirtyLabel = DirtySinkHelper.regexReplace(dirtyOptions.getLabels(),
                                        DirtyType.BATCH_LOAD_ERROR, null,
                                        dataBaseName, tableName, null);
                                String dirtyLogTag =
                                        DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(),
                                                DirtyType.BATCH_LOAD_ERROR, null,
                                                dataBaseName, tableName, null);
                                String dirtyIdentifier =
                                        DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(),
                                                DirtyType.BATCH_LOAD_ERROR, null,
                                                dataBaseName, tableName, null);
                                builder.setData(data)
                                        .setLabels(dirtyLabel)
                                        .setLogTag(dirtyLogTag)
                                        .setIdentifier(dirtyIdentifier)
                                        .setRowType(multipleWriters.get(tableId).getFlinkRowType())
                                        .setDirtyMessage(e.getMessage());
                                dirtySink.invoke(builder.build());
                                if (sinkMetricData != null) {
                                    sinkMetricData.outputDirtyMetricsWithEstimate(dataBaseName,
                                            tableName, 1, size);
                                }
                            } catch (Exception ex) {
                                if (!dirtyOptions.ignoreSideOutputErrors()) {
                                    throw new RuntimeException(ex);
                                }
                                LOG.warn("Dirty sink failed", ex);
                            }
                        }
                        return;
                    }

                    if (sinkMetricData != null) {
                        sinkMetricData.outputMetrics(dataBaseName, tableName, 1, size);
                    }
                }
            }
        } else {
            LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
        }
    }