function createStream()

in backends/kafka.js [69:122]


    function createStream(meta, opts) {
        var topic = meta.team + '-' + meta.project;

        var kafkaLoggerOptions = {
            topic: topic,
            properties: this.properties,
            dateFormats: {
                ts: 'pyepoch',
                isodate: 'iso'
            },
            leafHost: this.leafHost,
            leafPort: this.leafPort,
            proxyHost: this.proxyHost,
            proxyPort: this.proxyPort,
            maxRetries: this.maxRetries,
            kafkaClient: this.kafkaClient,
            isDisabled: this.isDisabled,
            statsd: this.statsd,
            kafkaProber: new Prober({
                title: 'kafka-winston',
                enabled: true,
                statsd: this.statsd
            })
        };

        kafkaLoggerOptions.batching = this.batching;

        if (this.batchingWhitelist) {
            kafkaLoggerOptions.batchingWhitelist = this.batchingWhitelist;
        }
        var logger = new KafkaLogger(kafkaLoggerOptions);

        return LoggerStream(logger, {
            highWaterMark: opts.highWaterMark
        }, function destroy() {
            /*jshint camelcase: false*/
            if (logger.kafkaClient) {
                var producer = logger.kafkaClient.get_producer(topic);
                if (producer && producer.connection &&
                    producer.connection.connection &&
                    producer.connection.connection._connection
                ) {
                    producer.connection.connection._connection.destroy();
                }

                if (logger.kafkaClient.zk) {
                    logger.kafkaClient.zk.close();
                }
            }
            if (logger.kafkaRestClient) {
                logger.kafkaRestClient.close();
            }
        });
    };