in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java [200:336]
public void processElement(RecordWithSchema recordWithSchema) throws Exception {
TableIdentifier tableId = recordWithSchema.getTableId();
if (recordWithSchema.isDDL()) {
// just record node metrics for ddl
if (sinkMetricData != null) {
sinkMetricData.outputMetricsWithEstimate(1);
}
return;
}
if (isSchemaUpdate(recordWithSchema)) {
if (multipleTables.get(tableId) == null) {
Table table = catalog.loadTable(recordWithSchema.getTableId());
multipleTables.put(tableId, table);
}
// refresh some runtime table properties
Table table = multipleTables.get(recordWithSchema.getTableId());
Map<String, String> tableProperties = table.properties();
boolean upsertMode = PropertyUtil.propertyAsBoolean(tableProperties,
UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
long targetFileSizeBytes = PropertyUtil.propertyAsLong(tableProperties,
WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
String formatString = tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
List<Integer> equalityFieldIds = recordWithSchema.getPrimaryKeys().stream()
.map(pk -> recordWithSchema.getSchema().findField(pk).fieldId())
.collect(Collectors.toList());
// if physical primary key not exist, put all field to logical primary key
if (equalityFieldIds.isEmpty() && multipleSinkOption.isPkAutoGenerated()) {
equalityFieldIds = recordWithSchema.getSchema().columns().stream()
.map(NestedField::fieldId)
.collect(Collectors.toList());
}
RowType flinkRowType = FlinkSchemaUtil.convert(recordWithSchema.getSchema());
RowDataTaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(
table,
recordWithSchema.getSchema(),
flinkRowType,
targetFileSizeBytes,
fileFormat,
equalityFieldIds,
upsertMode,
appendMode,
false);
if (multipleWriters.get(tableId) == null) {
String subWriterInlongMetric = inlongMetric + DELIMITER
+ Constants.DATABASE_NAME + "=" + tableId.namespace().toString()
+ DELIMITER
+ Constants.TABLE_NAME + "=" + tableId.name();
IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
tableId.toString(), taskWriterFactory, subWriterInlongMetric,
auditHostAndPorts, flinkRowType, dirtyOptions, dirtySink, true,
tableSchemaRowType, metaFieldIndex, switchAppendUpsertEnable, auditKeys);
writer.setup(getRuntimeContext(),
new CallbackCollector<>(
writeResult -> collector.collect(new MultipleWriteResult(tableId, writeResult))),
context);
writer.initializeState(functionInitializationContext);
writer.open(new Configuration());
multipleWriters.put(tableId, writer);
} else { // only if second times schema will evolute
// Refresh new schema maybe cause previous file writer interrupted, so here should handle it
multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
multipleWriters.get(tableId).setFlinkRowType(flinkRowType);
}
}
if (multipleWriters.get(tableId) != null) {
if (recordWithSchema.isDirty()) {
String dataBaseName = tableId.namespace().toString();
String tableName = tableId.name();
if (sinkMetricData != null) {
sinkMetricData.outputDirtyMetrics(dataBaseName,
tableName, recordWithSchema.getRowCount(), recordWithSchema.getRowSize());
}
} else {
for (RowData data : recordWithSchema.getData()) {
String dataBaseName = tableId.namespace().toString();
String tableName = tableId.name();
long size = CalculateObjectSizeUtils.getDataSize(data);
try {
if (switchAppendUpsertEnable && recordWithSchema.isIncremental()) {
multipleWriters.get(tableId).switchToUpsert();
}
multipleWriters.get(tableId).processElement(data);
} catch (Exception e) {
LOG.error(String.format("write error, raw data: %s", data), e);
if (!dirtyOptions.ignoreDirty()) {
throw e;
}
if (dirtySink != null) {
DirtyData.Builder<Object> builder = DirtyData.builder();
try {
String dirtyLabel = DirtySinkHelper.regexReplace(dirtyOptions.getLabels(),
DirtyType.BATCH_LOAD_ERROR, null,
dataBaseName, tableName, null);
String dirtyLogTag =
DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(),
DirtyType.BATCH_LOAD_ERROR, null,
dataBaseName, tableName, null);
String dirtyIdentifier =
DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(),
DirtyType.BATCH_LOAD_ERROR, null,
dataBaseName, tableName, null);
builder.setData(data)
.setLabels(dirtyLabel)
.setLogTag(dirtyLogTag)
.setIdentifier(dirtyIdentifier)
.setRowType(multipleWriters.get(tableId).getFlinkRowType())
.setDirtyMessage(e.getMessage());
dirtySink.invoke(builder.build());
if (sinkMetricData != null) {
sinkMetricData.outputDirtyMetricsWithEstimate(dataBaseName,
tableName, 1, size);
}
} catch (Exception ex) {
if (!dirtyOptions.ignoreSideOutputErrors()) {
throw new RuntimeException(ex);
}
LOG.warn("Dirty sink failed", ex);
}
}
return;
}
if (sinkMetricData != null) {
sinkMetricData.outputMetrics(dataBaseName, tableName, 1, size);
}
}
}
} else {
LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
}
}