public List pollResults()

in core/src/main/java/com/taobao/arthas/core/distribution/impl/ResultConsumerImpl.java [52:107]


    public List<ResultModel> pollResults() {
        try {
            lastAccessTime = System.currentTimeMillis();
            long accessTime = lastAccessTime;
            if (lock.tryLock(500, TimeUnit.MILLISECONDS)) {
                polling = true;
                sendingItemCount = 0;
                long firstResultTime = 0;
                // sending delay: time elapsed after firstResultTime
                long sendingDelay = 0;
                // waiting time: time elapsed after access
                long waitingTime = 0;
                List<ResultModel> sendingResults = new ArrayList<ResultModel>(resultBatchSizeLimit);

                while (!closed
                        &&sendingResults.size() < resultBatchSizeLimit
                        && sendingDelay < 100
                        && waitingTime < pollTimeLimit) {
                    ResultModel aResult = resultQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (aResult != null) {
                        sendingResults.add(aResult);
                        //是否为第一次获取到数据
                        if (firstResultTime == 0) {
                            firstResultTime = System.currentTimeMillis();
                        }
                        //判断是否需要立即发送出去
                        if (shouldFlush(sendingResults, aResult)) {
                            break;
                        }
                    } else {
                        if (firstResultTime > 0) {
                            //获取到部分数据后,队列已经取完,计算发送延时时间
                            sendingDelay = System.currentTimeMillis() - firstResultTime;
                        }
                        //计算总共等待时间,长轮询最大等待时间
                        waitingTime = System.currentTimeMillis() - accessTime;
                    }
                }

                //resultQueue.drainTo(sendingResults, resultSizeLimit-sendingResults.size());
                if(logger.isDebugEnabled()) {
                    logger.debug("pollResults: {}, results: {}", sendingResults.size(), JSON.toJSONString(sendingResults));
                }
                return sendingResults;
            }
        } catch (InterruptedException e) {
            //e.printStackTrace();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lastAccessTime = System.currentTimeMillis();
                polling = false;
                lock.unlock();
            }
        }
        return Collections.emptyList();
    }