in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java [86:111]
protected void process() {
while (running) {
try {
log.info("start flume receive from sink process");
while (running) {
BlockingQueue<Map<String, Object>> blockingQueue = SinkOfFlume.getQueue();
while (blockingQueue != null && !blockingQueue.isEmpty()) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
out = new ObjectOutputStream(bos);
Map<String, Object> message = blockingQueue.take();
out.writeObject(message.get("body"));
out.flush();
byte[] m = bos.toByteArray();
String m1 = new String(m);
bos.close();
FlumeRecord flumeRecord = new FlumeRecord<>();
flumeRecord.setRecord(extractValue(m1));
consume(flumeRecord);
}
}
} catch (Exception e) {
log.error("process error!", e);
}
}
}