public void prepare()

in core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java [141:227]


    public void prepare(
            Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
        super.prepare(stormConf, context, collector);
        this.conf = new Config();
        this.conf.putAll(stormConf);

        checkConfiguration();

        this.taskID = context.getThisTaskId();

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH);
        long start = System.currentTimeMillis();
        LOG.info("[Fetcher #{}] : starting at {}", taskID, sdf.format(start));

        // Register a "MultiCountMetric" to count different events in this bolt
        // Storm will emit the counts every n seconds to a special bolt via a
        // system stream
        // The data can be accessed by registering a "MetricConsumer" in the
        // topology

        int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10);

        this.eventCounter =
                context.registerMetric(
                        "fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);

        this.averagedMetrics =
                context.registerMetric(
                        "fetcher_average",
                        new MultiReducedMetric(new MeanReducer()),
                        metricsTimeBucketSecs);

        this.perSecMetrics =
                context.registerMetric(
                        "fetcher_average_persec",
                        new MultiReducedMetric(new PerSecondReducer()),
                        metricsTimeBucketSecs);

        // create gauges
        context.registerMetric(
                "activethreads",
                new IMetric() {
                    @Override
                    public Object getValueAndReset() {
                        return activeThreads.get();
                    }
                },
                metricsTimeBucketSecs);

        context.registerMetric(
                "throttler_size",
                new IMetric() {
                    @Override
                    public Object getValueAndReset() {
                        return throttler.estimatedSize();
                    }
                },
                metricsTimeBucketSecs);

        protocolFactory = ProtocolFactory.getInstance(conf);

        sitemapsAutoDiscovery = ConfUtils.getBoolean(stormConf, SITEMAP_DISCOVERY_PARAM_KEY, false);

        queueMode = ConfUtils.getString(conf, "fetcher.queue.mode", QUEUE_MODE_HOST);
        // check that the mode is known
        if (!queueMode.equals(QUEUE_MODE_IP)
                && !queueMode.equals(QUEUE_MODE_DOMAIN)
                && !queueMode.equals(QUEUE_MODE_HOST)) {
            LOG.error("Unknown partition mode : {} - forcing to byHost", queueMode);
            queueMode = QUEUE_MODE_HOST;
        }
        LOG.info("Using queue mode : {}", queueMode);

        this.crawlDelay = (long) (ConfUtils.getFloat(conf, "fetcher.server.delay", 1.0f) * 1000);

        this.maxCrawlDelay = (long) ConfUtils.getInt(conf, "fetcher.max.crawl.delay", 30) * 1000;

        this.maxCrawlDelayForce =
                ConfUtils.getBoolean(conf, "fetcher.max.crawl.delay.force", false);
        this.crawlDelayForce = ConfUtils.getBoolean(conf, "fetcher.server.delay.force", false);

        this.maxThrottleSleepMSec = ConfUtils.getLong(conf, "fetcher.max.throttle.sleep", -1);

        this.protocolMDprefix =
                ConfUtils.getString(
                        conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, protocolMDprefix);
    }