foreign/node/src/wire/message/partitioning.utils.ts (71 lines of code) (raw):

import { uint32ToBuf, uint64ToBuf } from '../number.utils.js'; import type { ValueOf } from '../../type.utils.js'; export const PartitionKind = { Balanced : 1, PartitionId : 2, MessageKey : 3 } as const; export type PartitionKind = typeof PartitionKind; export type PartitionKindId = keyof PartitionKind; export type PartitionKindValue = ValueOf<PartitionKind> export type Balanced = { kind: PartitionKind['Balanced'], value: null }; export type PartitionId = { kind: PartitionKind['PartitionId'], value: number // uint32 }; // string | uint32/64/128 export type MessageKeyValue = string | number | bigint | Buffer; export type MessageKey = { kind: PartitionKind['MessageKey'], value: MessageKeyValue }; export type Partitioning = Balanced | PartitionId | MessageKey; const Balanced: Balanced = { kind: PartitionKind.Balanced, value: null }; const PartitionId = (id: number): PartitionId => ({ kind: PartitionKind.PartitionId, value: id }); const MessageKey = (key: MessageKeyValue): MessageKey => ({ kind: PartitionKind.MessageKey, value: key }); // Helper export const Partitioning = { Balanced, PartitionId, MessageKey }; export const serializeMessageKey = (v: MessageKeyValue) => { if (v instanceof Buffer) return v; if ('string' === typeof v) return Buffer.from(v); if ('number' === typeof v) return uint32ToBuf(v); if ('bigint' === typeof v) return uint64ToBuf(v); throw new Error(`cannot serialize messageKey ${v}, ${typeof v}`); }; export const serializePartitioningValue = (part: Partitioning): Buffer => { const { kind, value } = part; switch (kind) { case PartitionKind.Balanced: return Buffer.alloc(0); case PartitionKind.PartitionId: return uint32ToBuf(value); case PartitionKind.MessageKey: return serializeMessageKey(value); } }; export const default_partionning: Balanced = { kind: PartitionKind.Balanced, value: null }; export const serializePartitioning = (p?: Partitioning) => { const part = p || default_partionning; const b = Buffer.alloc(2); const bValue = serializePartitioningValue(part); b.writeUint8(part.kind); b.writeUint8(bValue.length, 1); return Buffer.concat([ b, bValue ]); };