in bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java [44:85]
public LinkedBlockingQueue<Transaction> startWriteQueue(final int milliseconds) {
/**
* Write queue. Every 5 seconds, write
*/
final LinkedBlockingQueue<Transaction> transactionQueue = new LinkedBlockingQueue<Transaction>(getQueueSize());
new Thread() {
@Override
public void run() {
int fileNumber = 0;
while (true) {
waitFor(milliseconds, transactionQueue);
System.out.println("CLEARING " + transactionQueue.size() + " elements from queue.");
Stack<Transaction> transactionsToWrite = new Stack<Transaction>();
transactionQueue.drainTo(transactionsToWrite);
/**
* pop transactions from the queue, and sent them over http as json.
*/
while (!transactionsToWrite.isEmpty()) {
try {
String trAsJson = URLEncoder.encode(Utils.toJson(transactionsToWrite.pop()));
/**
* i.e. wget http://localhost:3000/rpush/guestbook/{"name":"cos boudnick", "state":"...",...}
*/
HttpResponse resp=Utils.get(path + "/" + trAsJson);
if(total%20==0) System.out.println("wrote customer " + trAsJson);
total++;
}
catch (Throwable t) {
System.err.println("transaction failed.... !");
t.printStackTrace();
}
System.out.println("TRANSACTIONS SO FAR " + total++ + " RATE " + total / ((System.currentTimeMillis() - startTime) / 1000));
}
}
}
}.start();
return transactionQueue;
}