in inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java [140:236]
public void invoke(T value, Context context)
throws IOException, ClassNotFoundException, JSQLParserException {
if (serializer == null) {
if (value instanceof StarRocksSinkRowDataWithMeta) {
StarRocksSinkRowDataWithMeta data = (StarRocksSinkRowDataWithMeta) value;
if (Strings.isNullOrEmpty(data.getDatabase())
|| Strings.isNullOrEmpty(data.getTable())
|| data.getDataRows() == null) {
log.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}",
data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows())));
return;
}
sinkManager.write(null, data.getDatabase(), data.getTable(), data.getDataRows());
return;
} else if (value instanceof StarRocksRowData) {
StarRocksRowData data = (StarRocksRowData) value;
if (Strings.isNullOrEmpty(data.getDatabase())
|| Strings.isNullOrEmpty(data.getTable())
|| data.getRow() == null) {
log.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}",
data.getDatabase(), data.getTable(), data.getRow()));
return;
}
sinkManager.write(data.getUniqueKey(), data.getDatabase(), data.getTable(), data.getRow());
return;
}
// raw data sink
sinkManager.write(null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), value.toString());
return;
}
if (value instanceof NestedRowData) {
NestedRowData ddlData = (NestedRowData) value;
if (ddlData.getSegments().length != 1 || ddlData.getSegments()[0].size() < NESTED_ROW_DATA_HEADER_SIZE) {
return;
}
int totalSize = ddlData.getSegments()[0].size();
byte[] data = new byte[totalSize - NESTED_ROW_DATA_HEADER_SIZE];
ddlData.getSegments()[0].get(NESTED_ROW_DATA_HEADER_SIZE, data);
Map<String, String> ddlMap = InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader());
if (ddlMap == null
|| "true".equals(ddlMap.get("snapshot"))
|| Strings.isNullOrEmpty(ddlMap.get("ddl"))
|| Strings.isNullOrEmpty(ddlMap.get("databaseName"))) {
return;
}
Statement statement = CCJSqlParserUtil.parse(ddlMap.get("ddl"));
if (statement instanceof Truncate) {
Truncate truncate = (Truncate) statement;
if (!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) {
return;
}
// TODO: add ddl to queue
} else if (statement instanceof Alter) {
}
}
if (value instanceof RowData) {
if (RowKind.UPDATE_BEFORE.equals(((RowData) value).getRowKind())) {
// do not need update_before, cause an update action happened on the primary keys will be separated into
// `delete` and `create`
return;
}
if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData) value).getRowKind())) {
// let go the UPDATE_AFTER and INSERT rows for tables who have a group of `unique` or `duplicate` keys.
return;
}
}
flushLegacyData();
Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete());
long serializeStartTime = System.currentTimeMillis();
String serializedData;
try {
serializedData = serializer.serialize(schemaUtils.filterOutTimeField(data));
} catch (Exception e) {
log.error("Failed to serialize data", e);
if (sinkExactlyMetric != null) {
sinkExactlyMetric.incNumSerializeError();
}
return;
}
if (sinkExactlyMetric != null) {
sinkExactlyMetric.incNumSerializeSuccess();
sinkExactlyMetric.recordSerializeDelay(System.currentTimeMillis() - serializeStartTime);
}
sinkManager.write(
null,
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
serializedData);
ouputMetrics(value, data);
}