public void run()

in src/java/org/apache/nutch/fetcher/Fetcher.java [196:479]


    public void run(Context innerContext)
        throws IOException, InterruptedException {

      setup(innerContext);
      try {
        Configuration conf = innerContext.getConfiguration();
        LinkedList<FetcherThread> fetcherThreads = new LinkedList<>();
        FetchItemQueues fetchQueues = new FetchItemQueues(conf);
        QueueFeeder feeder;

        int threadCount = conf.getInt("fetcher.threads.fetch", 10);
        LOG.info("Fetcher: threads: {}", threadCount);

        // NUTCH-2582: adapt Tika MIME detector pool size to thread count
        MimeUtil.setPoolSize(Math.max(10, threadCount / 2));

        int timeoutDivisor = conf.getInt("fetcher.threads.timeout.divisor", 2);
        LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor);

        int queueDepthMultiplier = conf.getInt("fetcher.queue.depth.multiplier",
            50);

        feeder = new QueueFeeder(innerContext, fetchQueues,
            threadCount * queueDepthMultiplier);

        feeder.start();

        int startDelay = conf.getInt("fetcher.threads.start.delay", 10);
        for (int i = 0; i < threadCount; i++) { // spawn threads
          if (startDelay > 0 && i > 0) {
            // short delay to avoid that DNS or other resources are temporarily
            // exhausted by all threads fetching simultaneously the first pages
            Thread.sleep(startDelay);
          }
          FetcherThread t = new FetcherThread(conf, getActiveThreads(),
              fetchQueues, feeder, spinWaiting, lastRequestStart, innerContext,
              errors, segmentName, parsing, storingContent, pages, bytes);
          fetcherThreads.add(t);
          t.start();
        }

        // select a timeout that avoids a task timeout
        long timeout = conf.getInt("mapreduce.task.timeout", 10 * 60 * 1000)
            / timeoutDivisor;

        // Used for threshold check, holds pages and bytes processed in the last
        // second
        int pagesLastSec;
        int bytesLastSec;

        int throughputThresholdNumRetries = 0;

        int throughputThresholdPages = conf
            .getInt("fetcher.throughput.threshold.pages", -1);
        LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages);

        int throughputThresholdMaxRetries = conf
            .getInt("fetcher.throughput.threshold.retries", 5);
        LOG.info("Fetcher: throughput threshold retries: {}",
            throughputThresholdMaxRetries);

        long throughputThresholdTimeLimit = conf
            .getLong("fetcher.throughput.threshold.check.after", -1);

        int targetBandwidth = conf.getInt("fetcher.bandwidth.target", -1)
            * 1000;
        int maxNumThreads = conf.getInt("fetcher.maxNum.threads", threadCount);
        if (maxNumThreads < threadCount) {
          LOG.info(
              "fetcher.maxNum.threads can't be < than {} : using {} instead",
              threadCount, threadCount);
          maxNumThreads = threadCount;
        }
        int bandwidthTargetCheckEveryNSecs = conf
            .getInt("fetcher.bandwidth.target.check.everyNSecs", 30);
        if (bandwidthTargetCheckEveryNSecs < 1) {
          LOG.info(
              "fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead");
          bandwidthTargetCheckEveryNSecs = 1;
        }

        int maxThreadsPerQueue = conf.getInt("fetcher.threads.per.queue", 1);

        int bandwidthTargetCheckCounter = 0;
        long bytesAtLastBWTCheck = 0l;

        do { // wait for threads to exit
          pagesLastSec = pages.get();
          bytesLastSec = (int) bytes.get();

          try {
            Thread.sleep(1000);
          } catch (InterruptedException e) {
          }

          pagesLastSec = pages.get() - pagesLastSec;
          bytesLastSec = (int) bytes.get() - bytesLastSec;

          innerContext.getCounter("FetcherStatus", "bytes_downloaded")
              .increment(bytesLastSec);

          reportStatus(innerContext, fetchQueues, pagesLastSec, bytesLastSec);

          LOG.info("-activeThreads=" + activeThreads + ", spinWaiting="
              + spinWaiting.get() + ", fetchQueues.totalSize="
              + fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount="
              + fetchQueues.getQueueCount());

          if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
            fetchQueues.dump();
          }

          // if throughput threshold is enabled
          if (throughputThresholdTimeLimit < System.currentTimeMillis()
              && throughputThresholdPages != -1) {
            // Check if we're dropping below the threshold
            if (pagesLastSec < throughputThresholdPages) {
              throughputThresholdNumRetries++;
              LOG.warn(
                  "{}: dropping below configured threshold of {} pages per second (current throughput: {} pages/sec.)",
                  throughputThresholdNumRetries, throughputThresholdPages,
                  pagesLastSec);

              // Quit if we dropped below threshold too many times
              if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
                LOG.warn(
                    "Dropped below threshold {} times, dropping fetch queues to shut down",
                    throughputThresholdNumRetries);

                // Disable the threshold checker
                throughputThresholdPages = -1;

                // Empty the queues cleanly and get number of items that were
                // dropped
                int hitByThrougputThreshold = fetchQueues.emptyQueues();

                if (hitByThrougputThreshold != 0)
                  innerContext
                      .getCounter("FetcherStatus", "hitByThrougputThreshold")
                      .increment(hitByThrougputThreshold);
              }
            }
          }

          // adjust the number of threads if a target bandwidth has been set
          if (targetBandwidth > 0) {
            if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs)
              bandwidthTargetCheckCounter++;
            else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
              long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)
                  / bandwidthTargetCheckEveryNSecs;

              bytesAtLastBWTCheck = bytes.get();
              bandwidthTargetCheckCounter = 0;

              int averageBdwPerThread = 0;
              if (activeThreads.get() > 0)
                averageBdwPerThread = (int) (bpsSinceLastCheck
                    / activeThreads.get());

              LOG.info("averageBdwPerThread : {} kbps",
                  (averageBdwPerThread / 1000));

              if (bpsSinceLastCheck < targetBandwidth
                  && averageBdwPerThread > 0) {
                // check whether it is worth doing e.g. more queues than threads

                if ((fetchQueues.getQueueCount()
                    * maxThreadsPerQueue) > activeThreads.get()) {

                  long remainingBdw = targetBandwidth - bpsSinceLastCheck;
                  int additionalThreads = Math
                      .round(remainingBdw / averageBdwPerThread);
                  int availableThreads = maxNumThreads - activeThreads.get();

                  // determine the number of available threads (min between
                  // availableThreads and additionalThreads)
                  additionalThreads = (availableThreads < additionalThreads
                      ? availableThreads
                      : additionalThreads);
                  LOG.info(
                      "Has space for more threads ({} vs {} kbps) \t=> adding {} new threads",
                      new Object[] { (bpsSinceLastCheck / 1000),
                          (targetBandwidth / 1000), additionalThreads });
                  // activate new threads
                  for (int i = 0; i < additionalThreads; i++) {
                    FetcherThread thread = new FetcherThread(conf,
                        getActiveThreads(), fetchQueues, feeder, spinWaiting,
                        lastRequestStart, innerContext, errors, segmentName,
                        parsing, storingContent, pages, bytes);
                    fetcherThreads.add(thread);
                    thread.start();
                  }
                }
              } else if (bpsSinceLastCheck > targetBandwidth
                  && averageBdwPerThread > 0) {
                // if the bandwidth we're using is greater then the expected
                // bandwidth, we have to stop some threads
                long excessBdw = bpsSinceLastCheck - targetBandwidth;
                int excessThreads = Math.round(excessBdw / averageBdwPerThread);
                LOG.info(
                    "Exceeding target bandwidth ({} vs {} kbps). \t=> excessThreads = {}",
                    new Object[] { bpsSinceLastCheck / 1000,
                        (targetBandwidth / 1000), excessThreads });
                // keep at least one
                if (excessThreads >= fetcherThreads.size())
                  excessThreads = 0;
                // de-activates threads
                for (int i = 0; i < excessThreads; i++) {
                  FetcherThread thread = fetcherThreads.removeLast();
                  thread.setHalted(true);
                }
              }
            }
          }

          // check timelimit
          if (!feeder.isAlive()) {
            int hitByTimeLimit = fetchQueues.checkTimelimit();
            if (hitByTimeLimit != 0)
              innerContext.getCounter("FetcherStatus", "hitByTimeLimit")
                  .increment(hitByTimeLimit);
          }

          /*
           * Some requests seem to hang, with no fetches finished and no new
           * fetches started during half of the MapReduce task timeout
           * (mapreduce.task.timeout, default value: 10 minutes). In order to
           * avoid that the task timeout is hit and the fetcher job is failed,
           * we stop the fetching now. See also the property
           * fetcher.threads.timeout.divisor.
           */
          if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
            LOG.warn("Timeout reached with no new requests since {} seconds.",
                timeout);
            LOG.warn("Aborting with {} hung threads{}.", activeThreads,
                feeder.isAlive() ? " (queue feeder still alive)" : "");
            innerContext.getCounter("FetcherStatus", "hungThreads")
                .increment(activeThreads.get());
            for (int i = 0; i < fetcherThreads.size(); i++) {
              FetcherThread thread = fetcherThreads.get(i);
              if (thread.isAlive()) {
                LOG.warn("Thread #{} hung while processing {}", i,
                    thread.getReprUrl());
                StackTraceElement[] stack = thread.getStackTrace();
                StringBuilder sb = new StringBuilder();
                sb.append("Stack of thread #").append(i).append(":\n");
                for (StackTraceElement s : stack) {
                  sb.append(s.toString()).append('\n');
                }
                LOG.warn(sb.toString());
              }
            }

            /*
             * signal the queue feeder that the timeout is reached and wait
             * shortly for it to shut down
             */
            fetchQueues.setTimeoutReached();
            if (feeder.isAlive()) {
              LOG.info(
                  "Signaled QueueFeeder to stop, waiting 1.5 seconds before exiting.");
              Thread.sleep(1500);
            }

            /*
             * log and count queued items dropped from the fetch queues because
             * of the timeout
             */
            LOG.warn("Aborting with {} queued fetch items in {} queues{}.",
                fetchQueues.getTotalSize(), fetchQueues.getQueueCount(),
                feeder.isAlive() ? " (queue feeder still alive)" : "");
            int hitByTimeout = fetchQueues.emptyQueues();
            innerContext.getCounter("FetcherStatus", "hitByTimeout")
                .increment(hitByTimeout);
            return;
          }

        } while (activeThreads.get() > 0);
        LOG.info("-activeThreads={}", activeThreads);
      } finally {
        cleanup(innerContext);
      }
    }