in src/com/amazon/kinesis/streaming/agent/Agent.java [186:228]
public Agent(AgentContext agentContext) {
this.logger = LoggerFactory.getLogger(Agent.class);
this.agentContext = agentContext;
this.sendingExecutor = getSendingExecutor(agentContext);
this.checkpoints = new SQLiteFileCheckpointStore(agentContext);
this.heartbeat = new HeartbeatService(this.agentContext, 1, TimeUnit.SECONDS) {
@Override
protected Object heartbeat(AgentContext agent) {
return Agent.this.heartbeat(agent);
}
@Override
protected String serviceName() {
return getClass().getSimpleName() + "[" + state().toString() + "]";
}
};
this.metricsEmitter = new AbstractScheduledService() {
@Override
protected void runOneIteration() throws Exception {
Agent.this.emitStatus();
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(Agent.this.agentContext.logStatusReportingPeriodSeconds(),
Agent.this.agentContext.logStatusReportingPeriodSeconds(), TimeUnit.SECONDS);
}
@Override
protected String serviceName() {
return Agent.this.serviceName() + ".MetricsEmitter";
}
@Override
protected void shutDown() throws Exception {
logger.debug("{}: shutting down...", serviceName());
// Emit status one last time before shutdown
Agent.this.emitStatus();
super.shutDown();
}
};
this.name = getClass().getSimpleName();
}