in adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java [70:118]
public void run() {
List<ConnectRecord> afterTransformConnect = new CopyOnWriteArrayList<>();;
while (!stopped) {
try {
Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
if (MapUtils.isEmpty(eventRecordMap)) {
logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(1000);
continue;
}
Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
if (MapUtils.isEmpty(latestTransformMap)) {
logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
this.waitForRunning(3000);
continue;
}
afterTransformConnect.clear();
List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
for (String runnerName : eventRecordMap.keySet()) {
TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
curEventRecords.forEach(pullRecord -> {
CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
logger.error("transfer do transform event record failed,stackTrace-", exception);
errorHandler.handle(pullRecord, exception);
return null;
})
.thenAccept(pushRecord -> {
if (Objects.nonNull(pushRecord)) {
afterTransformConnect.add(pushRecord);
} else {
offsetManager.commit(pullRecord);
}
});
completableFutures.add(transformFuture);
});
}
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
} catch (Exception exception) {
logger.error("transfer event record failed, stackTrace-", exception);
afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception));
}
}
}