in core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java [168:226]
void runInLoop() throws Throwable {
while (!stop) {
try {
List<MessageExt> list = this.unionConsumer.poll(10);
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
if (body == null || body.length == 0) {
break;
}
String keyClassName = messageExt.getUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME);
String valueClassName = messageExt.getUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME);
String topic = messageExt.getTopic();
int queueId = messageExt.getQueueId();
String brokerName = messageExt.getBrokerName();
MessageQueue queue = new MessageQueue(topic, brokerName, queueId);
mq2Commit.add(queue);
logger.debug("source topic queue:[{}]", queue);
String key = Utils.buildKey(brokerName, topic, queueId);
SourceSupplier.SourceProcessor<K, V> processor = (SourceSupplier.SourceProcessor<K, V>) wrapper.selectProcessor(key);
StreamContextImpl<V> context = new StreamContextImpl<>(properties, producer, mqAdmin, stateStore, key, idleWindowScaner);
processor.preProcess(context);
Pair<K, V> pair = processor.deserialize(keyClassName, valueClassName, body);
long timestamp = prepareTime(messageExt, processor);
Data<K, V> data = new Data<>(pair.getKey(), pair.getValue(), timestamp, new Properties());
context.setKey(pair.getKey());
if (topic.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
logger.debug("shuffle data: [{}]", data);
} else {
logger.debug("source data: [{}]", data);
}
try {
context.forward(data);
} catch (Throwable t) {
logger.error("process error.", t);
throw new DataProcessThrowable(t);
}
}
} catch (Throwable t) {
Object skipDataError = properties.getOrDefault(Constant.SKIP_DATA_ERROR, Boolean.TRUE);
if (skipDataError == Boolean.TRUE) {
logger.error("ignore error, jobId=[{}], skip this data.", topologyBuilder.getJobId(), t);
//ignored
} else {
throw t;
}
}
}
}