public LinkedBlockingQueue startWriteQueue()

in bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java [36:73]


    public LinkedBlockingQueue<Transaction> startWriteQueue(final int milliseconds){
        if(! path.toFile().isDirectory()) {
            throw new RuntimeException("Input for the queue Should be a directory! Files will be transactions0.txt, transactions1.txt, and so on.");
        }

        /**
         * Write queue.   Every 5 seconds, write
         */
        final LinkedBlockingQueue<Transaction> transactionQueue = new LinkedBlockingQueue<Transaction>(getQueueSize());
        new Thread(){
            @Override
            public void run() {
                int fileNumber=0;
                while(true){
                    waitFor(milliseconds, transactionQueue);
                    System.out.println("Clearing " + transactionQueue.size() + " elements");
                    Stack<Transaction> transactionsToWrite = new Stack<Transaction>();
                    transactionQueue.drainTo(transactionsToWrite);
                    StringBuffer lines = new StringBuffer();
                    try{
                        while(!transactionsToWrite.isEmpty()){
                            lines.append(Utils.toJson(transactionsToWrite.pop())+"\n");
                            total++;
                        }
                        Path outputFile = Paths.get(path.toFile().getAbsolutePath(), "/transactions" + fileNumber++ + ".txt");
                        Files.write(outputFile, lines.toString().getBytes());
                        System.out.println("WRITING FILE to " + outputFile.toFile().length() + "bytes -> " + outputFile.toFile().getAbsolutePath());
                    }
                    catch(Throwable t){
                        t.printStackTrace();
                    }
                    System.out.println(
                            "TRANSACTIONS SO FAR " + total++ +" RATE " + (total/((System.currentTimeMillis()-startTime)/1000) + " per second "));
                }
            }
        }.start();
        return transactionQueue;
    }