in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java [185:332]
public synchronized void invoke(T value, Context context) throws Exception {
long start = System.nanoTime();
if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
flushPreviousState();
}
if (null == serializer) {
if (value instanceof StarRocksSinkRowDataWithMeta) {
StarRocksSinkRowDataWithMeta data = (StarRocksSinkRowDataWithMeta) value;
if (Strings.isNullOrEmpty(data.getDatabase()) || Strings.isNullOrEmpty(data.getTable())
|| null == data.getDataRows()) {
LOG.warn(String.format("json row data not fullfilled. {database: %s, table: %s, dataRows: %s}",
data.getDatabase(), data.getTable(), data.getDataRows()));
return;
}
sinkManager.writeRecords(data.getDatabase(), data.getTable(), data.getDataRows());
return;
}
// raw data sink
sinkManager.writeRecords(sinkOptions.getDatabaseName(), sinkOptions.getTableName(), (String) value);
totalInvokeRows.inc(1);
totalInvokeRowsTime.inc(System.nanoTime() - start);
return;
}
if (value instanceof NestedRowData) {
final int headerSize = 256;
NestedRowData ddlData = (NestedRowData) value;
if (ddlData.getSegments().length != 1 || ddlData.getSegments()[0].size() < headerSize) {
return;
}
int totalSize = ddlData.getSegments()[0].size();
byte[] data = new byte[totalSize - headerSize];
ddlData.getSegments()[0].get(headerSize, data);
Map<String, String> ddlMap = InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader());
if (null == ddlMap || "true".equals(ddlMap.get("snapshot")) || Strings.isNullOrEmpty(ddlMap.get("ddl"))
|| Strings.isNullOrEmpty(ddlMap.get("databaseName"))) {
return;
}
Statement stmt = CCJSqlParserUtil.parse(ddlMap.get("ddl"));
if (stmt instanceof Truncate) {
Truncate truncate = (Truncate) stmt;
if (!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) {
return;
}
// TODO: add ddl to queue
} else if (stmt instanceof Alter) {
Alter alter = (Alter) stmt;
}
}
if (value instanceof RowData) {
if (!multipleSink && RowKind.UPDATE_BEFORE.equals(((RowData) value).getRowKind())) {
// do not need update_before, cauz an update action happened on the primary keys will be separated into
// `delete` and `create`
return;
}
if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData) value).getRowKind())) {
// let go the UPDATE_AFTER and INSERT rows for tables who have a group of `unique` or `duplicate` keys.
return;
}
}
if (multipleSink) {
GenericRowData rowData = (GenericRowData) value;
if (jsonDynamicSchemaFormat == null) {
jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(
this.sinkMultipleFormat);
}
JsonNode rootNode = jsonDynamicSchemaFormat.deserialize((byte[]) rowData.getField(0));
boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
if (isDDL) {
// Ignore ddl change for now
return;
}
String databaseName = jsonDynamicSchemaFormat.parse(rootNode, databasePattern);
String tableName = jsonDynamicSchemaFormat.parse(rootNode, tablePattern);
DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
String dirtyLabel = null;
String dirtyLogTag = null;
String dirtyIdentify = null;
try {
if (dirtyOptions.ignoreDirty()) {
if (dirtyOptions.getLabels() != null) {
dirtyLabel = jsonDynamicSchemaFormat.parse(rootNode,
DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), DirtyType.BATCH_LOAD_ERROR,
null));
}
if (dirtyOptions.getLogTag() != null) {
dirtyLogTag = jsonDynamicSchemaFormat.parse(rootNode,
DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), DirtyType.BATCH_LOAD_ERROR,
null));
}
if (dirtyOptions.getIdentifier() != null) {
dirtyIdentify = jsonDynamicSchemaFormat.parse(rootNode,
DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), DirtyType.BATCH_LOAD_ERROR,
null));
}
}
} catch (Exception e) {
LOG.warn("Parse dirty options failed. {}", ExceptionUtils.stringifyException(e));
}
List<RowKind> rowKinds = jsonDynamicSchemaFormat.opType2RowKind(
jsonDynamicSchemaFormat.getOpType(rootNode));
List<Map<String, String>> physicalDataList = jsonDynamicSchemaFormat.jsonNode2Map(
jsonDynamicSchemaFormat.getPhysicalData(rootNode));
JsonNode updateBeforeNode = jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
List<Map<String, String>> updateBeforeList = null;
if (updateBeforeNode != null) {
updateBeforeList = jsonDynamicSchemaFormat.jsonNode2Map(updateBeforeNode);
}
List<Map<String, String>> records = new ArrayList<>();
for (int i = 0; i < physicalDataList.size(); i++) {
for (RowKind rowKind : rowKinds) {
Map<String, String> record = null;
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
record = physicalDataList.get(i);
record.put("__op", String.valueOf(StarRocksSinkOP.UPSERT.ordinal()));
break;
case DELETE:
record = physicalDataList.get(i);
record.put("__op", String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
break;
case UPDATE_BEFORE:
if (updateBeforeList != null && updateBeforeList.size() > i) {
record = updateBeforeList.get(i);
record.put("__op", String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
}
break;
default:
throw new RuntimeException("Unrecognized row kind:" + rowKind);
}
if (record != null) {
records.add(record);
}
}
}
sinkManager.writeRecords(databaseName, tableName, records, dirtyLogTag, dirtyIdentify, dirtyLabel);
} else {
String record = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete()));
sinkManager.writeRecords(sinkOptions.getDatabaseName(), sinkOptions.getTableName(), record);
}
totalInvokeRows.inc(1);
totalInvokeRowsTime.inc(System.nanoTime() - start);
}