public void start()

in core/src/main/java/org/apache/brooklyn/core/feed/Poller.java [234:354]


    public void start() {
        if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this});
        if (started) { 
            throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", 
                    this, entity));
        }
        started = true;
        
        for (final Callable<?> oneOffJob : oneOffJobs) {
            Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).displayName("Poll").description("One-time poll job "+oneOffJob).build();
            oneOffTasks.add(adjunct.getExecutionContext().submit(task));
        }
        
        Duration minPeriod = null;
        Set<String> sensorSummaries = MutableSet.of();

        final Function<PollJob,String> scheduleNameFn = pollJob -> MutableList.of(adjunct !=null ? adjunct.getDisplayName() : null, pollJob.handler.getDescription())
                .stream().filter(Strings::isNonBlank).collect(Collectors.joining("; "));

        BiFunction<Runnable,String,Task<?>> tf = (job, scheduleName) -> {
            DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity),
                    () -> {
                        if (!Entities.isManagedActive(entity)) {
                            return null;
                        }
                        if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
                            return null;
                        }
                        job.run();
                        return null;
                    });
            // explicitly make non-transient -- we want to see its execution, even if parent is transient
            BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
            return task;
        };
        Multimap<Callable,PollJob> nonScheduledJobs = Multimaps.newSetMultimap(MutableMap.of(), MutableSet::of);
        pollJobs.stream().filter(pj -> !pj.skipInitialRun).forEach(pollJob -> nonScheduledJobs.put(pollJob.job, pollJob));

        // 'runInitially' could be an option on the job; currently we always do
        // if it's a scheduled task, that happens automatically; if it's a triggered task
        // we collect the distinct runnables and run each of those
        // (the poll job model doesn't work perfectly since usually all schedules/triggers are for the same job)

        for (final PollJob<V> pollJob : pollJobs) {
            String scheduleName = scheduleNameFn.apply(pollJob);
            boolean added = false;

            if (pollJob.pollPeriod!=null && pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
                ScheduledTask.Builder tb = ScheduledTask.builder(() -> tf.apply(pollJob.wrappedJob, scheduleName))
                        .cancelOnException(false)
                        .tag(adjunct != null ? BrooklynTaskTags.tagForContextAdjunct(adjunct) : null);
                added = true;
                tb.displayName("Periodic: " + scheduleName);
                tb.period(pollJob.pollPeriod);
                if (pollJob.skipInitialRun) tb.delay(pollJob.pollPeriod);

                if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) {
                    minPeriod = pollJob.pollPeriod;
                }
                ScheduledTask st = tb.build();
                scheduledTasks.add(st);
                log.debug("Submitting scheduled task "+st+" for poll/feed "+this+", job "+pollJob);
                Entities.submit(entity, st);
                nonScheduledJobs.removeAll(pollJob.job);
            }

            if (pollJob.pollTriggerSensor !=null) {
                added = true;
                if (pollJob.subscription!=null) {
                    throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already has subscription %s",
                            this, entity, pollJob.subscription));
                }
                String summary = pollJob.pollTriggerSensor.getName();
                if (pollJob.pollTriggerEntity!=null && !pollJob.pollTriggerEntity.equals(entity)) summary += " on "+pollJob.pollTriggerEntity;
                log.debug("Adding subscription to "+summary+" for poll/feed "+this+", job "+pollJob);
                sensorSummaries.add(summary);
                pollJob.subscription = adjunct.subscriptions().subscribe(pollJob.pollTriggerEntity !=null ? pollJob.pollTriggerEntity : adjunct.getEntity(), pollJob.pollTriggerSensor, event -> {
                    // submit this on every event
                    try {
                        adjunct.getExecutionContext().submit(tf.apply(pollJob.wrappedJob, scheduleName));
                    } catch (Exception e) {
                        throw Exceptions.propagate(e);
                    }
                });
            }

            if (!added) {
                if (log.isDebugEnabled()) log.debug("Empty poll job "+pollJob+" in "+this+" for "+entity+"; if all jobs are empty (or trigger only), will add a trivial one-time initial task");
            }
        }

        // no period for these, but we do need to run them initially, but combine if the Callable is the same (e.g. multiple triggers)
        // note the PollJob is one per trigger, and the wrappedJob is specific to the poll job, but doesn't depend on the trigger, so we can just take the first
        nonScheduledJobs.asMap().forEach( (jobC,jobP) -> {
            Runnable job = jobP.iterator().next().wrappedJob;
            String jobSummaries = jobP.stream().map(j -> j.handler.getDescription()).filter(Strings::isNonBlank).collect(Collectors.joining(", "));
            String name =  (adjunct !=null ? adjunct.getDisplayName() : "anonymous")+(Strings.isNonBlank(jobSummaries) ? "; "+jobSummaries : "");
            Task<Object> t = Tasks.builder().dynamic(true).displayName("Initial: " +name)
                    .body(
                            () -> DynamicTasks.queue(tf.apply(job, name)).getUnchecked())
                    .tag(adjunct != null ? BrooklynTaskTags.tagForContextAdjunct(adjunct) : null)
                    .build();
            log.debug("Submitting initial task "+t+" for poll/feed "+this+", job "+job+" (because otherwise is trigger-only)");
            Entities.submit(entity, t);
        });
        
        if (adjunct !=null) {
            if (sensorSummaries.isEmpty()) {
                if (minPeriod==null || minPeriod.equals(Duration.PRACTICALLY_FOREVER) || !minPeriod.isPositive()) {
                    adjunct.highlightTriggers("Not configured with a period or triggers");
                } else {
                    highlightTriggerPeriod(minPeriod);
                }
            } else if (minPeriod==null) {
                adjunct.highlightTriggers("Triggered by: "+ Strings.join(sensorSummaries, "; "));
            } else {
                // both
                adjunct.highlightTriggers("Running every "+minPeriod+" and on triggers: "+Strings.join(sensorSummaries, "; "));
            }
        }
    }