public void prepare()

in core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java [808:909]


    public void prepare(
            Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {

        super.prepare(stormConf, context, collector);

        Config conf = new Config();
        conf.putAll(stormConf);

        checkConfiguration(conf);

        LOG.info("[Fetcher #{}] : starting at {}", taskID, Instant.now());

        int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10);

        // Register a "MultiCountMetric" to count different events in this bolt
        // Storm will emit the counts every n seconds to a special bolt via a
        // system stream
        // The data can be accessed by registering a "MetricConsumer" in the
        // topology
        this.eventCounter =
                context.registerMetric(
                        "fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);

        // create gauges
        context.registerMetric(
                "activethreads",
                () -> {
                    return activeThreads.get();
                },
                metricsTimeBucketSecs);

        context.registerMetric(
                "in_queues",
                () -> {
                    return fetchQueues.inQueues.get();
                },
                metricsTimeBucketSecs);

        context.registerMetric(
                "num_queues",
                () -> {
                    return fetchQueues.queues.size();
                },
                metricsTimeBucketSecs);

        this.averagedMetrics =
                context.registerMetric(
                        "fetcher_average_perdoc",
                        new MultiReducedMetric(new MeanReducer()),
                        metricsTimeBucketSecs);

        this.perSecMetrics =
                context.registerMetric(
                        "fetcher_average_persec",
                        new MultiReducedMetric(new PerSecondReducer()),
                        metricsTimeBucketSecs);

        protocolFactory = ProtocolFactory.getInstance(conf);

        this.fetchQueues = new FetchItemQueues(conf);

        this.taskID = context.getThisTaskId();

        int threadCount = ConfUtils.getInt(conf, "fetcher.threads.number", 10);
        int startDelay = ConfUtils.getInt(conf, "fetcher.threads.start.delay", 10);

        for (int i = 0; i < threadCount; i++) {
            if (startDelay > 0 && i > 0) {
                // short delay to avoid that DNS or other resources are temporarily
                // exhausted by all threads fetching simultaneously the first pages
                try {
                    Thread.sleep(startDelay);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            new FetcherThread(conf, i).start();
        }

        // keep track of the URLs in fetching
        beingFetched = new String[threadCount];
        Arrays.fill(beingFetched, "");

        sitemapsAutoDiscovery = ConfUtils.getBoolean(stormConf, SITEMAP_DISCOVERY_PARAM_KEY, false);

        maxNumberURLsInQueues = ConfUtils.getInt(conf, "fetcher.max.urls.in.queues", -1);

        /*
         * If set to a valid path e.g. /tmp/fetcher-dump-{port} on a worker node, the content of the
         * queues will be dumped to the logs for debugging. The port number needs to match the one
         * used by the FetcherBolt instance.
         */
        String debugfiletriggerpattern =
                ConfUtils.getString(conf, "fetcherbolt.queue.debug.filepath");

        if (StringUtils.isNotBlank(debugfiletriggerpattern)) {
            debugfiletrigger =
                    new File(
                            debugfiletriggerpattern.replaceAll(
                                    "\\{port\\}", Integer.toString(context.getThisWorkerPort())));
        }
    }