lib/push_consumer.js (85 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ "use strict"; require("./env_init"); const assert = require("assert"); const EventEmitter = require("events").EventEmitter; const common = require("./common"); const binding = common.requireBinding("rocketmq"); const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown"); const START_STATUS = { STOPPED: 0, STARTED: 1, STOPPING: 2, STARTING: 3 }; const OPTIONS_LOG_LEVEL = { FATAL: 1, ERROR: 2, WARN: 3, INFO: 4, DEBUG: 5, TRACE: 6, NUM: 7 }; const DEFAULT_OPTIONS = {}; let producerRef = 0; let timer; class RocketMQPushConsumer extends EventEmitter { /** * RocketMQ PushConsumer constructor * @param {String} groupId the group id * @param {String} [instanceName] the instance name * @param {Object} options the options */ constructor(groupId, instanceName, options) { super(); if(typeof instanceName !== "string" && !options) { options = instanceName; instanceName = null; } options = Object.assign({}, DEFAULT_OPTIONS, options || {}); if(options.logLevel && typeof options.logLevel === "string") { options.logLevel = OPTIONS_LOG_LEVEL[options.logLevel.toUpperCase()] || OPTIONS_LOG_LEVEL.INFO; } this.core = new binding.PushConsumer(groupId, instanceName, options); this.core.setListener(this.emit.bind(this, "message")); this.status = START_STATUS.STOPPED; } [START_OR_SHUTDOWN](method, callback) { let promise; let resolve; let reject; if(!callback) { promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject; }); } else { resolve = reject = callback; } this.core[method]((err, ret) => { if(err) return reject(err); if(method === "start") { this.status = START_STATUS.STARTED; if(!producerRef) timer = setInterval(function() {}, 24 * 3600 * 1000); producerRef++; } else { this.status = START_STATUS.STOPPED; producerRef--; if(!producerRef) clearInterval(timer); } return callback ? resolve(undefined, ret) : resolve(ret); }); if(!callback) return promise; } /** * Start the push consumer * @param {Function} [callback] the callback function * @return {Promise|undefined} returns a Promise if no callback */ start(callback) { assert(this.status === START_STATUS.STOPPED); this.status = START_STATUS.STARTING; return this[START_OR_SHUTDOWN]("start", callback); } /** * Shutdown the push consumer * @param {Function} [callback] the callback function * @return {Promise|undefined} returns a Promise if no callback */ shutdown(callback) { assert(this.status === START_STATUS.STARTED); this.status = START_STATUS.STOPPING; return this[START_OR_SHUTDOWN]("shutdown", callback); } /** * subscribe a topic * @param {String} topic the topic to be subscribed * @param {String} [expression] the additional expression to be subscribed * @return {Number} the subscribe status result */ subscribe(topic, expression) { assert(this.status === START_STATUS.STOPPED); assert(topic && typeof topic === "string"); assert(!expression || expression && typeof expression === "string"); return this.core.subscribe(topic, expression || ""); } } module.exports = RocketMQPushConsumer;