src/forwarder/index.js (64 lines of code) (raw):

/* Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License */ /* * Autoscaler Forwarder function * * * Forwards PubSub messages from the Scheduler topic to the Poller topic. */ // eslint-disable-next-line no-unused-vars -- for type checking only. const express = require('express'); const {PubSub} = require('@google-cloud/pubsub'); const {logger} = require('../autoscaler-common/logger'); const assertDefined = require('../autoscaler-common/assert-defined'); // GCP service clients const pubSub = new PubSub(); /** * Handle the forwarder request from HTTP * * For testing purposes - uses a fixed message. * * @param {express.Request} req * @param {express.Response} res */ async function forwardFromHTTP(req, res) { const payloadString = '[{ ' + ' "projectId": "memorystore-cluster-autoscaler", ' + ' "instanceId": "my-memorystore-cluster", ' + ' "scalerPubSubTopic": ' + '"projects/memorystore-cluster-autoscaler/topics/my-scaling-topic", ' + ' "minSize": 1, ' + ' "maxSize": 3, ' + ' "stateProjectId" : "memorystore-cluster-autoscaler" ' + '}]'; try { const payload = Buffer.from(payloadString, 'utf8'); JSON.parse(payload.toString()); // Log exception in App project if payload // cannot be parsed const pollerTopicName = assertDefined( process.env.POLLER_TOPIC, 'POLLER_TOPIC environment variable', ); const pollerTopic = pubSub.topic(pollerTopicName); pollerTopic.publishMessage({data: payload}); logger.debug({ message: `Poll request forwarded to PubSub Topic ${pollerTopicName}`, }); res.status(200).end(); } catch (err) { logger.error({ message: `An error occurred in the Autoscaler forwarder (HTTP): ${err}`, err: err, payload: payloadString, }); res.status(500).end('An exception occurred'); } } /** * Handle the Forwarder request from PubSub * * @param {any} pubSubEvent * @param {*} context */ async function forwardFromPubSub(pubSubEvent, context) { let payload; try { payload = Buffer.from(pubSubEvent.data, 'base64'); JSON.parse(payload.toString()); // Log exception in App project if payload // cannot be parsed const pollerTopicName = assertDefined( process.env.POLLER_TOPIC, 'POLLER_TOPIC environment variable', ); const pollerTopic = pubSub.topic(pollerTopicName); pollerTopic.publishMessage({data: payload}); logger.debug({ message: `Poll request forwarded to PubSub Topic ${pollerTopicName}`, }); } catch (err) { logger.error({ message: `An error occurred in the Autoscaler forwarder (PubSub): ${err}`, err: err, payload: payload, }); } } module.exports = { forwardFromHTTP, forwardFromPubSub, };