public void runExtreme()

in Extremem/src/main/java/com/amazon/corretto/benchmark/extremem/Bootstrap.java [14:460]


  public void runExtreme() {
    CustomerThread[] customer_threads;
    ServerThread[] server_threads;

    RelativeTime customer_stagger = null;
    RelativeTime server_stagger = null;

    // Memory accounting: I have no more fields than parent class
    // ExtrememThread.  So my memory usage is accounted for during my
    // construction.
      
    MemoryLog memory = memoryLog();
    MemoryLog garbage = garbageLog();
    MemoryLog all_threads_accumulator = (
      new MemoryLog(LifeSpan.NearlyForever));
    all_threads_accumulator.memoryFootprint(this);
      
    Trace.msg(1, "@ ",
              Integer.toString(memory.hashCode()),
              ": Bootstrap.memoryLog()");
    Trace.msg(1, "@ ",
              Integer.toString(garbage.hashCode()),
              ": Bootstrap.garbageLog()");
    Trace.msg(1, "@ ", Integer.toString(all_threads_accumulator.hashCode()),
              ": Bootstrap.all_threads_accumulator");
      
    // config.initialize() replaces the random number generation seed
    // of this before generating the dictionary.
    config.initialize(this);
    if (config.ReportCSV()) {
      Report.output("All times reported in microseconds");
      config.dumpCSV(this);
    }
    else
      config.dump(this);
      
    CustomerLogAccumulator customer_accumulator;
    ServerLogAccumulator server_accumulator;
    MemoryLog customer_alloc_accumulator, customer_garbage_accumulator;
    MemoryLog server_alloc_accumulator, server_garbage_accumulator;
    customer_accumulator = new CustomerLogAccumulator(this, LifeSpan.NearlyForever, config.ResponseTimeMeasurements());
    server_accumulator = new ServerLogAccumulator(this, LifeSpan.NearlyForever, config.ResponseTimeMeasurements());

    if (!config.ReportIndividualThreads()) {
      customer_alloc_accumulator = new MemoryLog(LifeSpan.NearlyForever);
      customer_alloc_accumulator.memoryFootprint(this);

      Trace.msg(1, "@ ",
                Integer.toString(customer_alloc_accumulator.hashCode()),
                ": Bootstrap.customer_alloc_accumulator");

      customer_garbage_accumulator = new MemoryLog(LifeSpan.NearlyForever);
      customer_garbage_accumulator.memoryFootprint(this);
      
      Trace.msg(1, "@ ",
                Integer.toString(customer_garbage_accumulator.hashCode()),
                ": Bootstrap.customer_garbage_accumulator");

      server_alloc_accumulator = new MemoryLog(LifeSpan.NearlyForever);
      server_alloc_accumulator.memoryFootprint(this);

      Trace.msg(1, "@ ",
                Integer.toString(server_alloc_accumulator.hashCode()),
                ": Bootstrap.server_alloc_accumulator");

      server_garbage_accumulator = new MemoryLog(LifeSpan.NearlyForever);
      server_garbage_accumulator.memoryFootprint(this);

      Trace.msg(1, "@ ",
                Integer.toString(server_garbage_accumulator.hashCode()),
                ": Bootstrap.server_garbage__accumulator");
    } else {
      customer_alloc_accumulator = null;
      customer_garbage_accumulator = null;
      server_alloc_accumulator = null;
      server_garbage_accumulator = null;
    }
      
    SalesTransactionQueue[] sales_queues = (
      new SalesTransactionQueue[config.SalesTransactionQueueCount()]);
    Util.referenceArray(this, LifeSpan.NearlyForever,
                        config.SalesTransactionQueueCount());
    for (int i = 0; i < config.SalesTransactionQueueCount(); i++)
      sales_queues[i] = new SalesTransactionQueue(this,
                                                  LifeSpan.NearlyForever);
    BrowsingHistoryQueue[] browsing_queues = (
      new BrowsingHistoryQueue[config.BrowsingHistoryQueueCount()]);
    Util.referenceArray(this, LifeSpan.NearlyForever,
                        config.BrowsingHistoryQueueCount());
    for (int i = 0; i < config.BrowsingHistoryQueueCount(); i++)
      browsing_queues[i] = new BrowsingHistoryQueue(this,
                                                    LifeSpan.NearlyForever);
    Trace.msg(4, "browsing_queues and sales_queues established");
      
    Products all_products = (
      new Products(this, LifeSpan.NearlyForever, config));
    Trace.msg(4, "all_products established");
    Customers all_customers = new Customers(this, LifeSpan.NearlyForever,
                                            config);
    Trace.msg(4, "all_customers established");
      
    if (config.CustomerThreads() > 0) {
      // Stagger the Customer threads so they are not all triggered at
      // the same moment in time.
      RelativeTime period = config.CustomerPeriod();
      long period_ns = (
        period.nanoseconds() +
        (config.CustomerPeriod().seconds() * NanosPerSecond));
      long stagger = period_ns / config.CustomerThreads();
      customer_stagger = new RelativeTime(this, stagger / NanosPerSecond,
                                          (int) (stagger % NanosPerSecond));
      Trace.msg(3, "Customer stagger set to: ",
                customer_stagger.toString(this));
    }
      
    RelativeTime customer_replacement_stagger = null;
    RelativeTime product_replacement_stagger = null;
      
    if (config.ServerThreads() > 0) {
      // Stagger the Server threads so they are not all triggered at the
      // same moment in time.
      RelativeTime period = config.ServerPeriod();
      long period_ns = (period.nanoseconds() +
                        (config.ServerPeriod().seconds() * NanosPerSecond));
      long stagger = period_ns / config.ServerThreads();
      server_stagger = new RelativeTime(this, stagger / NanosPerSecond,
                                        (int) (stagger % NanosPerSecond));
      customer_replacement_stagger = (
        config.CustomerReplacementPeriod().divideBy(this,
                                                    config.ServerThreads()));
      product_replacement_stagger = (
        config.ProductReplacementPeriod().divideBy(this,
                                                   config.ServerThreads()));
      Trace.msg(3, "Server stagger set to: ", server_stagger.toString(this));
    }
      
    // In addition to config.InitializationDelay(), reserve 1 second for
    // variable costs of initializing every 2000 threads.  
    int start_delay_milliseconds = (
      (config.CustomerThreads() + config.ServerThreads()) / 2);
      
    RelativeTime start_delay = (config.InitializationDelay().
                                addMillis(this, start_delay_milliseconds));
      
    String s = start_delay.toString(this);
    Trace.msg(3, "");
    Trace.msg(3, "Simulation starts in ", s);
    Util.abandonEphemeralString(this, s);
      
    AbsoluteTime now = AbsoluteTime.now(this);
    AbsoluteTime start_time = now.addRelative(this, start_delay);
    start_delay.garbageFootprint(this);
    start_delay = null;
    now.garbageFootprint(this);
    now = null;
    AbsoluteTime end_time = (
      start_time.addRelative(this, config.SimulationDuration()));
    end_time.changeLifeSpan(this, LifeSpan.NearlyForever);
      
    AbsoluteTime staggered_customer_replacement = (
      new AbsoluteTime(this, start_time));
      
    AbsoluteTime staggered_product_replacement = (
      new AbsoluteTime(this, start_time));
      
    if (config.ReportCSV()) {
      s = Long.toString(start_time.microseconds());
      Util.ephemeralString(this, s.length());
      Report.output("Simulation start time,", s);
    } else {
      s = start_time.toString(this);
      Report.output("  Simulation starts: ", s);
    }
    Trace.msg(2, "");
    Trace.msg(2, "  Simulation starts: ", s);
    Util.abandonEphemeralString(this, s);
      
    if (config.ReportCSV()) {
      s = Long.toString(end_time.microseconds());
      Util.ephemeralString(this, s.length());
      Report.output("Simulation end time,", s);
    } else {
      s = end_time.toString(this);
      Report.output("End simulation time: ", s);
    }
    Trace.msg(2, "End simulation time: ", s);
    Trace.msg(2, "");
    Util.abandonEphemeralString(this, s);
      
    Trace.msg(2, "starting up CustomerThreads: ",
              Integer.toString(config.CustomerThreads()));
      
    // Initialize and startup all of the threads as specified in
    // config.
    customer_threads = new CustomerThread[config.CustomerThreads()];
    Util.referenceArray(this, LifeSpan.NearlyForever,
                        config.CustomerThreads());
      
    AbsoluteTime staggered_start = start_time.addMinutes(this, 0);
    int bq_no = config.BrowsingHistoryQueueCount() - 1;
    int sq_no = config.SalesTransactionQueueCount() - 1;
    for (int i = 0; i < config.CustomerThreads(); i++) {
      customer_threads[i] = (
        new CustomerThread(config, randomLong(), i, all_products,
                           all_customers, browsing_queues[bq_no],
                           sales_queues[sq_no], customer_accumulator,
                           customer_alloc_accumulator,
                           customer_garbage_accumulator, staggered_start,
                           end_time));
      if (bq_no-- == 0)
        bq_no = config.BrowsingHistoryQueueCount() - 1;
      if (sq_no-- == 0)
        sq_no = config.SalesTransactionQueueCount() - 1;
      staggered_start.garbageFootprint(this);
      staggered_start = staggered_start.addRelative(this, customer_stagger);
      customer_threads[i].start(); // will wait for first release
    }
    staggered_start.garbageFootprint(this);
    if (customer_stagger != null)
      customer_stagger.garbageFootprint(this);
      
    Trace.msg(2, "starting up ServerThreads: ",
              Integer.toString(config.ServerThreads()));
      
    server_threads = new ServerThread[config.ServerThreads()];
    Util.referenceArray(this,
                        LifeSpan.NearlyForever, config.ServerThreads());
      
    staggered_start = start_time.addMinutes(this, 0);
      
    bq_no = config.BrowsingHistoryQueueCount() - 1;
    sq_no = config.SalesTransactionQueueCount() - 1;
    for (int i = 0; i < config.ServerThreads(); i++) {
      server_threads[i] = (
        new ServerThread(config,
                         randomLong(), i, all_products, all_customers,
                         browsing_queues[bq_no], sales_queues[sq_no],
                         server_accumulator, server_alloc_accumulator,
                         server_garbage_accumulator, staggered_start,
                         staggered_customer_replacement,
                         staggered_product_replacement, end_time));
      if (bq_no-- == 0)
        bq_no = config.BrowsingHistoryQueueCount() - 1;
      if (sq_no-- == 0)
        sq_no = config.SalesTransactionQueueCount() - 1;
      staggered_start.garbageFootprint(this);
      staggered_start = staggered_start.addRelative(this, server_stagger);
      staggered_customer_replacement.garbageFootprint(this);
      staggered_customer_replacement = (
        staggered_customer_replacement
        .addRelative(this, customer_replacement_stagger));
      staggered_product_replacement.garbageFootprint(this);
      staggered_product_replacement = (
        staggered_product_replacement
        .addRelative(this, product_replacement_stagger));
      server_threads[i].start(); // will wait for first release
    }
    staggered_start.garbageFootprint(this);
      
    staggered_customer_replacement.garbageFootprint(this);
    staggered_customer_replacement = null;
      
    staggered_product_replacement.garbageFootprint(this);
    staggered_product_replacement = null;
      
    if (server_stagger != null)
      server_stagger.garbageFootprint(this);
      
    if (customer_replacement_stagger != null)
      customer_replacement_stagger.garbageFootprint(this);
    customer_replacement_stagger = null;
      
    if (product_replacement_stagger != null)
      product_replacement_stagger.garbageFootprint(this);
    product_replacement_stagger = null;
      
    now = AbsoluteTime.now(this);
    if (config.ReportCSV()) {
      s = Long.toString(now.microseconds());
      Util.ephemeralString(this, s.length());
      Report.output("Initialization completion time,", s);
    } else {
      s = now.toString(this);
      Report.output("");
      Report.output("Initialization completes at time: ", s);
    }
    Util.abandonEphemeralString(this, s);

    if (now.compare(start_time) > 0) {
      Configuration.usage("Initialization must complete before start."
                          + "  Increase InitializationDelay.");
      // Does not return.
    }
    start_time.garbageFootprint(this);
    start_time = null;
      
    now.garbageFootprint(this);
    now = null;
      
    Trace.msg(2, "Joining with customer threads");
      
    // Each thread will terminate when the end_time is reached.
    for (int i = 0; i < config.CustomerThreads(); i++) {
      try {
        customer_threads[i].join();
      } catch (InterruptedException x) {
        i--;                    // just try it again
      }
    }
      
    Trace.msg(2, "Joining with server threads");
      
    for (int i = 0; i < config.ServerThreads(); i++) {
      try {
        server_threads[i].join();
      } catch (InterruptedException x) {
        i--;                    // just try it again
      }
    }
      
    Trace.msg(2, "Program simulation has ended");
    all_products.report(this);
    all_customers.report(this);
    if (!config.ReportIndividualThreads()) {
        
      Report.acquireReportLock();
      customer_accumulator.report(this, "(all customer threads)",
                                  config.ReportCSV());
      MemoryLog.report(this, config.ReportCSV(), customer_alloc_accumulator,
                       customer_garbage_accumulator);
        
      Report.output("");
      Report.output("Bootstrap thread after reporting customer accumulator");
      MemoryLog.report(this, config.ReportCSV(), memory, garbage);
        
      server_accumulator.report(this, "(all server threads)",
                                config.ReportCSV());
      MemoryLog.report(this, config.ReportCSV(), server_alloc_accumulator,
                       server_garbage_accumulator);
        
      customer_alloc_accumulator.foldInto(server_alloc_accumulator);
      customer_alloc_accumulator.foldOutof(customer_garbage_accumulator);
      customer_alloc_accumulator.foldOutof(server_garbage_accumulator);
        
      all_threads_accumulator.foldInto(customer_alloc_accumulator);
        
      Report.output();
      Report.output("Customer/Server thread Net Allocation (expect zero)");
      MemoryLog.reportCumulative(this, config.ReportCSV(),
                                 customer_alloc_accumulator);

      Report.releaseReportLock();
    } else {
      // Individual threads have printed their individual reports.
      for (int i = 0; i < config.CustomerThreads(); i++) {
        all_threads_accumulator.foldInto(customer_threads[i].memoryLog());
        all_threads_accumulator.foldOutof(customer_threads[i].garbageLog());
      }
      for (int i = 0; i < config.ServerThreads(); i++) {
        all_threads_accumulator.foldInto(server_threads[i].memoryLog());
        all_threads_accumulator.foldOutof(server_threads[i].garbageLog());
      }
    }
      
    for (int i = 0; i < config.ServerThreads(); i++)
      server_threads[i].garbageFootprint(this);
    server_threads = null;
    Util.abandonReferenceArray(this, LifeSpan.NearlyForever,
                               config.ServerThreads());
      
    for (int i = 0; i < config.CustomerThreads(); i++)
      customer_threads[i].garbageFootprint(this);
    customer_threads = null;
    Util.abandonReferenceArray(this, LifeSpan.NearlyForever,
                               config.CustomerThreads());
      
    end_time.garbageFootprint(this);
    end_time = null;
      
    customer_threads = null;
    server_threads = null;

    all_customers.garbageFootprint(this);
    all_customers = null;
      
    all_products.garbageFootprint(this);
    all_products = null;
      
    for (int i = 0; i < config.BrowsingHistoryQueueCount(); i++) {
      browsing_queues[i].garbageFootprint(this);
      browsing_queues[i] = null;
    }
    Util.abandonReferenceArray(this, LifeSpan.NearlyForever,
                               config.BrowsingHistoryQueueCount());
    browsing_queues = null;
      
    for (int i = 0; i < config.SalesTransactionQueueCount(); i++) {
      sales_queues[i].garbageFootprint(this);
      sales_queues[i] = null;
    }
    Util.abandonReferenceArray(this, LifeSpan.NearlyForever,
                               config.SalesTransactionQueueCount());
    sales_queues = null;
      
    // While these objects may not be garbage quite yet, treat them as
    // if they were, so that report on net memory allocations balance
    // out to zero.
      
    if (!config.ReportIndividualThreads()) {
      server_garbage_accumulator.garbageFootprint(this);
      server_alloc_accumulator.garbageFootprint(this);
      customer_garbage_accumulator.garbageFootprint(this);
      customer_alloc_accumulator.garbageFootprint(this);
    } 
    server_accumulator.garbageFootprint(this);
    customer_accumulator.garbageFootprint(this);

    config.garbageFootprint(this);
    all_threads_accumulator.garbageFootprint(this);
    this.garbageFootprint(this);
      
    // This should be empty
    Report.output("");
    Report.output("Bootstrap thread after discarding this");
    MemoryLog.report(this, config.ReportCSV(), memory, garbage);
      
    all_threads_accumulator.foldInto(memory);
    all_threads_accumulator.foldOutof(garbage);
    
    Report.acquireReportLock();
    Report.output();
    Report.output("Net allocation for all threads (should be zero)");
    MemoryLog.reportCumulative(this, config.ReportCSV(),
                               all_threads_accumulator);
    all_threads_accumulator = null;
    Report.releaseReportLock();

    server_accumulator.reportPercentiles(this, config.ReportCSV());
    customer_accumulator.reportPercentiles(this, config.ReportCSV());
      
    customer_accumulator = null;
    customer_alloc_accumulator  = null;
    customer_garbage_accumulator = null;
    server_accumulator = null;
    server_alloc_accumulator = null;
    server_garbage_accumulator = null;
  }