products/userale/example/kafka-rest-router.js (81 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ const commander = require("commander"); const { program } = require("commander"); const express = require("express"); const bodyParser = require("body-parser"); // Functions. const myParseInt = (value, _dummyPrevious) => { // parseInt takes a string and a radix const parsedValue = parseInt(value, 10); if (isNaN(parsedValue)) { throw new commander.InvalidArgumentError("Not a number."); } return parsedValue; }; const configureCors = (req, res, next) => { res.header("Access-Control-Allow-Origin", "*"); res.header("Access-Control-Allow-Methods", "POST,OPTIONS"); res.header( "Access-Control-Allow-Headers", "Content-Type, Authorization, Content-Length, X-Requested-With", ); // intercept OPTIONS method if ("OPTIONS" == req.method) { res.sendStatus(200); } else { next(); } }; const transformAndForward = async (req, res) => { try { const body = typeof req.body === "string" ? JSON.parse(req.body) : req.body; // Sometimes UserALE sends empty buffers or empty objects, these need to be filtered out const isEmptyArray = Array.isArray(body) && body.length === 0; const isEmptyObject = typeof body === "object" && body !== null && Object.keys(body).length === 0; if (isEmptyArray || isEmptyObject) return; if (options.verbose) console.log(JSON.stringify(body, null, 3)); const transformedPayload = { records: body.map((log) => ({ key: log["sessionId"], value: log, })), }; if (options.verbose) console.log(JSON.stringify(transformedPayload, null, 3)); const response = await fetch(options.forwardTo, { method: "POST", body: JSON.stringify(transformedPayload), headers: { "Content-Type": "application/json" }, }); const statusCode = response.status; res.status(statusCode).send(`Forwarded status code: ${statusCode}`); } catch (error) { console.error("Error: ", error); res.status(500).send("Internal service error"); } }; const gracefulShutdown = () => { process.exit(); }; // Args program .requiredOption("-f, --forward-to <forward-address>", "Forwarding address") .option("-p, --port <port>", "Port number", myParseInt, 8000) .option("-v, --verbose", "Enable verbose mode", false) .parse(process.argv); const options = program.opts(); console.log(options); // Configure app const app = express(); app.set("port", options.port); app.use(configureCors); app.use(bodyParser.urlencoded({ extended: true, limit: "100mb" })); app.use(bodyParser.json({ limit: "100mb" })); app.use(bodyParser.text()); // Routes app.post("/", transformAndForward); // Server app.listen(app.get("port"), () => { console.log( "UserALE Karapace Router started...", `\n\tPort: ${app.get("port")}`, `\n\tForwarding address: ${options.forwardTo}`, `\n\tVerbose: ${options.verbose}`, ); }); process.on("SIGTERM", () => gracefulShutdown()); process.on("SIGINT", () => gracefulShutdown());