application-workloads/pubnub/pubnub-eventhub-bridge/App_Data/jobs/continuous/pnwebjob/PNSubscribeToEH1EH2ToPNPublish.js (117 lines of code) (raw):
// This file Takes a PN Input, Sends it to EH1's input. Then, anything that arrives on EH2's output is sent back out
// via PN Publish. User-defined magic should happen between EH1 -> EH2, such as Stream Analytics, etc.
'use strict';
// PN Vars
var PNSubChannel = process.env['CUSTOMCONNSTR_PNSubChannel'];
var PNPubChannel = process.env['CUSTOMCONNSTR_PNPubChannel'];
var PNAnnounceChannel = process.env['CUSTOMCONNSTR_PNAnnounceChannel'];
var PNPublishKey = process.env['CUSTOMCONNSTR_PNPublishKey'];
var PNSubscribeKey = process.env['CUSTOMCONNSTR_PNSubscribeKey'];
// Azure Vars
var EHInConnectionString = process.env['CUSTOMCONNSTR_EHInConnectionString'];
var EHOutConnectionString = process.env['CUSTOMCONNSTR_EHOutConnectionString'];
if (!PNSubChannel || !PNPubChannel || !PNAnnounceChannel || !PNPublishKey || !PNSubscribeKey) {
console.log("Error: Missing required vars!");
dumpVars();
process.exit();
}
function dumpVars() {
console.log("PNSubChannel: ", PNSubChannel);
console.log("PNPubChannel: ", PNPubChannel);
console.log("PNPublishKey: ", PNPublishKey);
console.log("PNSubscribeKey: ", PNSubscribeKey);
console.log("EHInConnectionString: ", EHInConnectionString);
console.log("EHOutConnectionString: ", EHOutConnectionString);
console.log();
console.log("Env Process Dump:");
console.log();
console.log(process.env);
}
dumpVars();
var uuid = "webjob-" + (Math.random() * 1000);
console.log("Setting UUID to " + uuid);
var pubnub = require("pubnub")({
ssl: true,
publishKey: PNPublishKey,
subscribeKey: PNSubscribeKey,
uuid: uuid
});
var PNPublish = function (ehEvent) {
// console.log('Event Received from Egress EH, Publishing to PN: ');
// console.log(JSON.stringify(ehEvent.body));
// console.log("");
if (Array.isArray(ehEvent.body)){
ehEvent.body.forEach(function(element){
pubnub.publish({
channel: PNPubChannel,
message: element
},
function(status, response) {
if (status.error) {
console.log("PN Array Element Publish Error: ", status);
console.log("Message causing error: ", element);
}
else {
console.log("message published with server response: ", response);
console.log("Message published successfully: ", element);
}
});
});
} else {
console.log("No array detected.");
pubnub.publish({
channel: PNPubChannel,
message: ehEvent.body
},
function(status, response) {
if (status.error) {
console.log("PN Array Element Publish Error: ", status);
console.log("Message causing error: ", ehEvent.body);
}
else {
console.log("message published with server response: ", response);
console.log("Message published successfully: ", ehEvent.body);
}
});
}
};
var receiveAfterTime = Date.now() - 0;
var EventHubClient = require('azure-event-hubs').Client;
var Promise = require('bluebird');
var printError = function (err) {
console.log("Event Hub Error: " + err.message);
};
/************** Create the Ingress Path */
var EHInClient = EventHubClient.fromConnectionString(EHInConnectionString);
// Create the EH Client
EHInClient.open()
.then(EHInClient.getPartitionIds.bind(EHInClient))
.catch(printError);
// Create the sender, and then, subscribe via PN, forwarding all messages to this new subscriber to the sender.
EHInClient.createSender().then(function (sender) {
pubnub.addListener({
message: function(message) {
console.log("Received Message: ", JSON.stringify(message, null, 4));
// console.log("Forwarding from PN Subscriber to Ingress EH: " + JSON.stringify(message, null, 4));
sender.send(message);
},
status: function(message) {
printError(message); // this is more than just errors - all PN status events
},
//,presence: function(message) {
// // optionally, handle presence
// }
});
pubnub.subscribe({
channels: [PNSubChannel]
});
if (PNAnnounceChannel && PNAnnounceChannel != "disabled") {
pubnub.setState({
channels: [PNAnnounceChannel],
state: {
EHInConnectionString: EHInConnectionString,
EHOutConnectionString: EHOutConnectionString
}
},
function (message) {
// optionally, handle status, response
});
}
});
/************** Create the Egress Path */
var EHOutClient = EventHubClient.fromConnectionString(EHOutConnectionString);
EHOutClient.open()
.then(EHOutClient.getPartitionIds.bind(EHOutClient))
.then(function (partitionIds) {
return Promise.map(partitionIds, function (partitionId) {
return EHOutClient.createReceiver('$Default', partitionId, {'startAfterTime': receiveAfterTime}).then(function (receiver) {
receiver.on('errorReceived', printError);
receiver.on('message', PNPublish);
});
});
})
.catch(printError);