in bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java [36:73]
public LinkedBlockingQueue<Transaction> startWriteQueue(final int milliseconds){
if(! path.toFile().isDirectory()) {
throw new RuntimeException("Input for the queue Should be a directory! Files will be transactions0.txt, transactions1.txt, and so on.");
}
/**
* Write queue. Every 5 seconds, write
*/
final LinkedBlockingQueue<Transaction> transactionQueue = new LinkedBlockingQueue<Transaction>(getQueueSize());
new Thread(){
@Override
public void run() {
int fileNumber=0;
while(true){
waitFor(milliseconds, transactionQueue);
System.out.println("Clearing " + transactionQueue.size() + " elements");
Stack<Transaction> transactionsToWrite = new Stack<Transaction>();
transactionQueue.drainTo(transactionsToWrite);
StringBuffer lines = new StringBuffer();
try{
while(!transactionsToWrite.isEmpty()){
lines.append(Utils.toJson(transactionsToWrite.pop())+"\n");
total++;
}
Path outputFile = Paths.get(path.toFile().getAbsolutePath(), "/transactions" + fileNumber++ + ".txt");
Files.write(outputFile, lines.toString().getBytes());
System.out.println("WRITING FILE to " + outputFile.toFile().length() + "bytes -> " + outputFile.toFile().getAbsolutePath());
}
catch(Throwable t){
t.printStackTrace();
}
System.out.println(
"TRANSACTIONS SO FAR " + total++ +" RATE " + (total/((System.currentTimeMillis()-startTime)/1000) + " per second "));
}
}
}.start();
return transactionQueue;
}