public synchronized void invoke()

in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java [185:332]


    public synchronized void invoke(T value, Context context) throws Exception {
        long start = System.nanoTime();
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
            flushPreviousState();
        }
        if (null == serializer) {
            if (value instanceof StarRocksSinkRowDataWithMeta) {
                StarRocksSinkRowDataWithMeta data = (StarRocksSinkRowDataWithMeta) value;
                if (Strings.isNullOrEmpty(data.getDatabase()) || Strings.isNullOrEmpty(data.getTable())
                        || null == data.getDataRows()) {
                    LOG.warn(String.format("json row data not fullfilled. {database: %s, table: %s, dataRows: %s}",
                            data.getDatabase(), data.getTable(), data.getDataRows()));
                    return;
                }
                sinkManager.writeRecords(data.getDatabase(), data.getTable(), data.getDataRows());
                return;
            }
            // raw data sink
            sinkManager.writeRecords(sinkOptions.getDatabaseName(), sinkOptions.getTableName(), (String) value);
            totalInvokeRows.inc(1);
            totalInvokeRowsTime.inc(System.nanoTime() - start);
            return;
        }
        if (value instanceof NestedRowData) {
            final int headerSize = 256;
            NestedRowData ddlData = (NestedRowData) value;
            if (ddlData.getSegments().length != 1 || ddlData.getSegments()[0].size() < headerSize) {
                return;
            }
            int totalSize = ddlData.getSegments()[0].size();
            byte[] data = new byte[totalSize - headerSize];
            ddlData.getSegments()[0].get(headerSize, data);
            Map<String, String> ddlMap = InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader());
            if (null == ddlMap || "true".equals(ddlMap.get("snapshot")) || Strings.isNullOrEmpty(ddlMap.get("ddl"))
                    || Strings.isNullOrEmpty(ddlMap.get("databaseName"))) {
                return;
            }
            Statement stmt = CCJSqlParserUtil.parse(ddlMap.get("ddl"));
            if (stmt instanceof Truncate) {
                Truncate truncate = (Truncate) stmt;
                if (!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) {
                    return;
                }
                // TODO: add ddl to queue
            } else if (stmt instanceof Alter) {
                Alter alter = (Alter) stmt;
            }
        }
        if (value instanceof RowData) {
            if (!multipleSink && RowKind.UPDATE_BEFORE.equals(((RowData) value).getRowKind())) {
                // do not need update_before, cauz an update action happened on the primary keys will be separated into
                // `delete` and `create`
                return;
            }
            if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData) value).getRowKind())) {
                // let go the UPDATE_AFTER and INSERT rows for tables who have a group of `unique` or `duplicate` keys.
                return;
            }
        }

        if (multipleSink) {
            GenericRowData rowData = (GenericRowData) value;
            if (jsonDynamicSchemaFormat == null) {
                jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(
                        this.sinkMultipleFormat);
            }
            JsonNode rootNode = jsonDynamicSchemaFormat.deserialize((byte[]) rowData.getField(0));
            boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
            if (isDDL) {
                // Ignore ddl change for now
                return;
            }
            String databaseName = jsonDynamicSchemaFormat.parse(rootNode, databasePattern);
            String tableName = jsonDynamicSchemaFormat.parse(rootNode, tablePattern);

            DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();

            String dirtyLabel = null;
            String dirtyLogTag = null;
            String dirtyIdentify = null;
            try {
                if (dirtyOptions.ignoreDirty()) {
                    if (dirtyOptions.getLabels() != null) {
                        dirtyLabel = jsonDynamicSchemaFormat.parse(rootNode,
                                DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), DirtyType.BATCH_LOAD_ERROR,
                                        null));
                    }
                    if (dirtyOptions.getLogTag() != null) {
                        dirtyLogTag = jsonDynamicSchemaFormat.parse(rootNode,
                                DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), DirtyType.BATCH_LOAD_ERROR,
                                        null));
                    }
                    if (dirtyOptions.getIdentifier() != null) {
                        dirtyIdentify = jsonDynamicSchemaFormat.parse(rootNode,
                                DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), DirtyType.BATCH_LOAD_ERROR,
                                        null));
                    }
                }
            } catch (Exception e) {
                LOG.warn("Parse dirty options failed. {}", ExceptionUtils.stringifyException(e));
            }

            List<RowKind> rowKinds = jsonDynamicSchemaFormat.opType2RowKind(
                    jsonDynamicSchemaFormat.getOpType(rootNode));
            List<Map<String, String>> physicalDataList = jsonDynamicSchemaFormat.jsonNode2Map(
                    jsonDynamicSchemaFormat.getPhysicalData(rootNode));
            JsonNode updateBeforeNode = jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
            List<Map<String, String>> updateBeforeList = null;
            if (updateBeforeNode != null) {
                updateBeforeList = jsonDynamicSchemaFormat.jsonNode2Map(updateBeforeNode);
            }
            List<Map<String, String>> records = new ArrayList<>();
            for (int i = 0; i < physicalDataList.size(); i++) {
                for (RowKind rowKind : rowKinds) {
                    Map<String, String> record = null;
                    switch (rowKind) {
                        case INSERT:
                        case UPDATE_AFTER:
                            record = physicalDataList.get(i);
                            record.put("__op", String.valueOf(StarRocksSinkOP.UPSERT.ordinal()));
                            break;
                        case DELETE:
                            record = physicalDataList.get(i);
                            record.put("__op", String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
                            break;
                        case UPDATE_BEFORE:
                            if (updateBeforeList != null && updateBeforeList.size() > i) {
                                record = updateBeforeList.get(i);
                                record.put("__op", String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
                            }
                            break;
                        default:
                            throw new RuntimeException("Unrecognized row kind:" + rowKind);
                    }
                    if (record != null) {
                        records.add(record);
                    }
                }
            }
            sinkManager.writeRecords(databaseName, tableName, records, dirtyLogTag, dirtyIdentify, dirtyLabel);
        } else {
            String record = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete()));
            sinkManager.writeRecords(sinkOptions.getDatabaseName(), sinkOptions.getTableName(), record);
        }

        totalInvokeRows.inc(1);
        totalInvokeRowsTime.inc(System.nanoTime() - start);
    }