ingestion/streaming/index.js (59 lines of code) (raw):

#! /usr/bin/env node /** * Copyright 2019 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ 'use strict'; // Usage: pubsock <ws url> <topic> // // Get messages arriving via WebSocket (JSONL-formatted) and // publish them individually to the specified topic // // TODO: Set message attributes from data coming over via websockets // // if (process.argv.length < 4) { console.error(`Usage: pubsock <WebSocket URL> <topic-mame>`); process.exit(1); } const socketUrl = process.argv[2]; const topicName = process.argv[3]; const WebSocket = require('ws'); const { PubSub } = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const ws = new WebSocket(socketUrl); let topic = undefined; const publishMessages = function () { let topic = pubsub.topic(topicName); ws.on('open', open); ws.on('message', inbound); ws.on('close', close); }; const open = function () { console.error('Web socket connection opened'); }; const inbound = function (data) { try { let payload = Buffer.from(data); topic.publisher.publish(payload, { origin: socketUrl }, (err, messageID) => { if (err) { console.error(`error in publish callback: ${err}`); } }); } catch (error) { console.error(`caught error publishing message: ${error}`); } }; const close = function () { console.error('Web socket connection closed'); process.exit(1); }; try { topic = pubsub.topic(topicName); topic.exists((err, exists) => { if (err) { console.error(`Error looking for specified topic ${topicName}: ${error}`); process.exit(1); } else { if (!exists) { console.error(`Topic ${topicName} not found, creating...`); topic.create((err, topic, apiResponse) => { if (err) { console.error(`Could not create non-existent topic ${topicName}: ${apiResponse} ${err}`); process.exit(1); } else { console.error(`Created topic ${topicName}`); publishMessages(); } }); } else { publishMessages(); } } }); } catch (error) { console.error(`Error: ${error}`); process.exit(1); }