in flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java [215:237]
public void run(SourceContext<OUT> ctx) throws Exception {
while (runningChecker.isRunning()) {
exceptionListener.checkErroneous();
Message message = consumer.receive(1000);
if (! (message instanceof BytesMessage)) {
LOG.warn("Active MQ source received non bytes message: {}", message);
continue;
}
BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
OUT value = deserializationSchema.deserialize(bytes);
synchronized (ctx.getCheckpointLock()) {
if (!autoAck && addId(bytesMessage.getJMSMessageID())) {
ctx.collect(value);
unacknowledgedMessages.put(bytesMessage.getJMSMessageID(), bytesMessage);
} else {
ctx.collect(value);
}
}
}
}