void runInLoop()

in core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java [168:226]


        void runInLoop() throws Throwable {
            while (!stop) {
                try {
                    List<MessageExt> list = this.unionConsumer.poll(10);
                    for (MessageExt messageExt : list) {
                        byte[] body = messageExt.getBody();
                        if (body == null || body.length == 0) {
                            break;
                        }

                        String keyClassName = messageExt.getUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME);
                        String valueClassName = messageExt.getUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME);

                        String topic = messageExt.getTopic();
                        int queueId = messageExt.getQueueId();
                        String brokerName = messageExt.getBrokerName();
                        MessageQueue queue = new MessageQueue(topic, brokerName, queueId);
                        mq2Commit.add(queue);
                        logger.debug("source topic queue:[{}]", queue);


                        String key = Utils.buildKey(brokerName, topic, queueId);
                        SourceSupplier.SourceProcessor<K, V> processor = (SourceSupplier.SourceProcessor<K, V>) wrapper.selectProcessor(key);

                        StreamContextImpl<V> context = new StreamContextImpl<>(properties, producer, mqAdmin, stateStore, key, idleWindowScaner);

                        processor.preProcess(context);

                        Pair<K, V> pair = processor.deserialize(keyClassName, valueClassName, body);

                        long timestamp = prepareTime(messageExt, processor);

                        Data<K, V> data = new Data<>(pair.getKey(), pair.getValue(), timestamp, new Properties());
                        context.setKey(pair.getKey());
                        if (topic.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                            logger.debug("shuffle data: [{}]", data);
                        } else {
                            logger.debug("source data: [{}]", data);
                        }

                        try {
                            context.forward(data);
                        } catch (Throwable t) {
                            logger.error("process error.", t);
                            throw new DataProcessThrowable(t);
                        }
                    }

                } catch (Throwable t) {
                    Object skipDataError = properties.getOrDefault(Constant.SKIP_DATA_ERROR, Boolean.TRUE);
                    if (skipDataError == Boolean.TRUE) {
                        logger.error("ignore error, jobId=[{}], skip this data.", topologyBuilder.getJobId(), t);
                        //ignored
                    } else {
                        throw t;
                    }
                }
            }
        }