in src/com/amazon/kinesis/streaming/agent/tailing/FileTailer.java [71:112]
public FileTailer(AgentContext agentContext,
FileFlow<R> flow,
SourceFileTracker fileTracker,
AsyncPublisherService<R> publisher,
IParser<R> parser,
FileCheckpointStore checkpoints) throws IOException {
super();
this.agentContext = agentContext;
this.flow = flow;
this.serviceName = super.serviceName() + "[" + this.flow.getId() + "]";
this.checkpoints = checkpoints;
this.publisher = publisher;
this.parser = parser;
this.fileTracker = new SourceFileTracker(this.agentContext, this.flow);
this.minTimeBetweenFilePollsMillis = flow.minTimeBetweenFilePollsMillis();
this.maxTimeBetweenFileTrackerRefreshMillis = flow.maxTimeBetweenFileTrackerRefreshMillis();
this.metricsEmitter = new AbstractScheduledService() {
@Override
protected void runOneIteration() throws Exception {
FileTailer.this.emitStatus();
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(FileTailer.this.agentContext.logStatusReportingPeriodSeconds(),
FileTailer.this.agentContext.logStatusReportingPeriodSeconds(), TimeUnit.SECONDS);
}
@Override
protected String serviceName() {
return FileTailer.this.serviceName() + ".MetricsEmitter";
}
@Override
protected void shutDown() throws Exception {
LOGGER.debug("{}: shutting down...", serviceName());
// Emit status one last time before shutdown
FileTailer.this.emitStatus();
super.shutDown();
}
};
}