in core/src/main/java/site/ycsb/Client.java [277:401]
public static void main(String[] args) {
Properties props = parseArguments(args);
boolean status = Boolean.valueOf(props.getProperty(STATUS_PROPERTY, String.valueOf(false)));
String label = props.getProperty(LABEL_PROPERTY, "");
long maxExecutionTime = Integer.parseInt(props.getProperty(MAX_EXECUTION_TIME, "0"));
//get number of threads, target and db
int threadcount = Integer.parseInt(props.getProperty(THREAD_COUNT_PROPERTY, "1"));
String dbname = props.getProperty(DB_PROPERTY, "site.ycsb.BasicDB");
int target = Integer.parseInt(props.getProperty(TARGET_PROPERTY, "0"));
//compute the target throughput
double targetperthreadperms = -1;
if (target > 0) {
double targetperthread = ((double) target) / ((double) threadcount);
targetperthreadperms = targetperthread / 1000.0;
}
Thread warningthread = setupWarningThread();
warningthread.start();
Measurements.setProperties(props);
Workload workload = getWorkload(props);
final Tracer tracer = getTracer(props, workload);
initWorkload(props, warningthread, workload, tracer);
System.err.println("Starting test.");
final CountDownLatch completeLatch = new CountDownLatch(threadcount);
final List<ClientThread> clients = initDb(dbname, props, threadcount, targetperthreadperms,
workload, tracer, completeLatch);
if (status) {
boolean standardstatus = false;
if (props.getProperty(Measurements.MEASUREMENT_TYPE_PROPERTY, "").compareTo("timeseries") == 0) {
standardstatus = true;
}
int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval", "10"));
boolean trackJVMStats = props.getProperty(Measurements.MEASUREMENT_TRACK_JVM_PROPERTY,
Measurements.MEASUREMENT_TRACK_JVM_PROPERTY_DEFAULT).equals("true");
statusthread = new StatusThread(completeLatch, clients, label, standardstatus, statusIntervalSeconds,
trackJVMStats);
statusthread.start();
}
Thread terminator = null;
long st;
long en;
int opsDone;
try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_SPAN)) {
final Map<Thread, ClientThread> threads = new HashMap<>(threadcount);
for (ClientThread client : clients) {
threads.put(new Thread(tracer.wrap(client, "ClientThread")), client);
}
st = System.currentTimeMillis();
for (Thread t : threads.keySet()) {
t.start();
}
if (maxExecutionTime > 0) {
terminator = new TerminatorThread(maxExecutionTime, threads.keySet(), workload);
terminator.start();
}
opsDone = 0;
for (Map.Entry<Thread, ClientThread> entry : threads.entrySet()) {
try {
entry.getKey().join();
opsDone += entry.getValue().getOpsDone();
} catch (InterruptedException ignored) {
// ignored
}
}
en = System.currentTimeMillis();
}
try {
try (final TraceScope span = tracer.newScope(CLIENT_CLEANUP_SPAN)) {
if (terminator != null && !terminator.isInterrupted()) {
terminator.interrupt();
}
if (status) {
// wake up status thread if it's asleep
statusthread.interrupt();
// at this point we assume all the monitored threads are already gone as per above join loop.
try {
statusthread.join();
} catch (InterruptedException ignored) {
// ignored
}
}
workload.cleanup();
}
} catch (WorkloadException e) {
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
try {
try (final TraceScope span = tracer.newScope(CLIENT_EXPORT_MEASUREMENTS_SPAN)) {
exportMeasurements(props, opsDone, en - st);
}
} catch (IOException e) {
System.err.println("Could not export measurements, error: " + e.getMessage());
e.printStackTrace();
System.exit(-1);
}
System.exit(0);
}