in backends/kafka.js [69:122]
function createStream(meta, opts) {
var topic = meta.team + '-' + meta.project;
var kafkaLoggerOptions = {
topic: topic,
properties: this.properties,
dateFormats: {
ts: 'pyepoch',
isodate: 'iso'
},
leafHost: this.leafHost,
leafPort: this.leafPort,
proxyHost: this.proxyHost,
proxyPort: this.proxyPort,
maxRetries: this.maxRetries,
kafkaClient: this.kafkaClient,
isDisabled: this.isDisabled,
statsd: this.statsd,
kafkaProber: new Prober({
title: 'kafka-winston',
enabled: true,
statsd: this.statsd
})
};
kafkaLoggerOptions.batching = this.batching;
if (this.batchingWhitelist) {
kafkaLoggerOptions.batchingWhitelist = this.batchingWhitelist;
}
var logger = new KafkaLogger(kafkaLoggerOptions);
return LoggerStream(logger, {
highWaterMark: opts.highWaterMark
}, function destroy() {
/*jshint camelcase: false*/
if (logger.kafkaClient) {
var producer = logger.kafkaClient.get_producer(topic);
if (producer && producer.connection &&
producer.connection.connection &&
producer.connection.connection._connection
) {
producer.connection.connection._connection.destroy();
}
if (logger.kafkaClient.zk) {
logger.kafkaClient.zk.close();
}
}
if (logger.kafkaRestClient) {
logger.kafkaRestClient.close();
}
});
};