public void run()

in flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java [215:237]


    public void run(SourceContext<OUT> ctx) throws Exception {
        while (runningChecker.isRunning()) {
            exceptionListener.checkErroneous();

            Message message = consumer.receive(1000);
            if (! (message instanceof BytesMessage)) {
                LOG.warn("Active MQ source received non bytes message: {}", message);
                continue;
            }
            BytesMessage bytesMessage = (BytesMessage) message;
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bytes);
            OUT value = deserializationSchema.deserialize(bytes);
            synchronized (ctx.getCheckpointLock()) {
                if (!autoAck && addId(bytesMessage.getJMSMessageID())) {
                    ctx.collect(value);
                    unacknowledgedMessages.put(bytesMessage.getJMSMessageID(), bytesMessage);
                } else {
                    ctx.collect(value);
                }
            }
        }
    }