protected void process()

in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java [86:111]


    protected void process() {
        while (running) {
            try {
                log.info("start flume receive from sink process");
                while (running) {
                    BlockingQueue<Map<String, Object>> blockingQueue = SinkOfFlume.getQueue();
                    while (blockingQueue != null && !blockingQueue.isEmpty()) {
                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                        ObjectOutput out = null;
                        out = new ObjectOutputStream(bos);
                        Map<String, Object> message = blockingQueue.take();
                        out.writeObject(message.get("body"));
                        out.flush();
                        byte[] m = bos.toByteArray();
                        String m1 = new String(m);
                        bos.close();
                        FlumeRecord flumeRecord = new FlumeRecord<>();
                        flumeRecord.setRecord(extractValue(m1));
                        consume(flumeRecord);
                    }
                }
            } catch (Exception e) {
                log.error("process error!", e);
            }
        }
    }