public void run()

in adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java [70:118]


    public void run() {
        List<ConnectRecord> afterTransformConnect = new CopyOnWriteArrayList<>();;
        while (!stopped) {
            try {
                Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
                if (MapUtils.isEmpty(eventRecordMap)) {
                    logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
                    this.waitForRunning(1000);
                    continue;
                }
                Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
                if (MapUtils.isEmpty(latestTransformMap)) {
                    logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
                    this.waitForRunning(3000);
                    continue;
                }

                afterTransformConnect.clear();
                List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
                for (String runnerName : eventRecordMap.keySet()) {
                    TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
                    List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
                    curEventRecords.forEach(pullRecord -> {
                        CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
                            .exceptionally((exception) -> {
                                logger.error("transfer do transform event record failed,stackTrace-", exception);
                                errorHandler.handle(pullRecord, exception);
                                return null;
                            })
                            .thenAccept(pushRecord -> {
                                if (Objects.nonNull(pushRecord)) {
                                    afterTransformConnect.add(pushRecord);
                                } else {
                                    offsetManager.commit(pullRecord);
                                }
                            });
                        completableFutures.add(transformFuture);
                    });
                }
                CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
                circulatorContext.offerTargetTaskQueue(afterTransformConnect);
                logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
            } catch (Exception exception) {
                logger.error("transfer event record failed, stackTrace-", exception);
                afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception));
            }

        }
    }