foreign/node/src/wire/message/message.utils.ts (70 lines of code) (raw):
import Debug from 'debug';
import { uint32ToBuf } from '../number.utils.js';
import { serializeHeaders, type Headers } from './header.utils.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { serializePartitioning, type Partitioning } from './partitioning.utils.js';
import { parse as parseUUID } from '../uuid.utils.js';
const debug = Debug('iggy:client');
export type MessageIdKind = 0 | 0n | string;
export type CreateMessage = {
id?: MessageIdKind,
headers?: Headers,
payload: string | Buffer
};
export const isValidMessageId = (x?: unknown): x is MessageIdKind =>
x === undefined || x === 0 || x === 0n || 'string' === typeof x;
export const serializeMessageId = (id?: unknown) => {
if(!isValidMessageId(id))
throw new Error(`invalid message id: '${id}' (use uuid string or 0)`)
if(id === undefined || id === 0 || id === 0n) {
return Buffer.alloc(16, 0);
}
try {
const uuid = parseUUID(id);
return Buffer.from(uuid.toHex(), 'hex');
} catch (err) {
throw new Error(`invalid message id: '${id}' (use uuid string or 0)`, { cause: err })
}
}
export const serializeMessage = (msg: CreateMessage) => {
const { id, headers, payload } = msg;
const bId = serializeMessageId(id);
const bHeaders = serializeHeaders(headers);
const bHLen = uint32ToBuf(bHeaders.length);
const bPayload = 'string' === typeof payload ? Buffer.from(payload) : payload
const bPLen = uint32ToBuf(bPayload.length);
const r = Buffer.concat([
bId,
bHLen,
bHeaders,
bPLen,
bPayload
]);
debug(
'id', bId.length, bId.toString('hex'),
// 'binLength/HD', bHLen.length, bHLen.toString('hex'),
'headers', bHeaders.length, bHeaders.toString('hex'),
'binLength/PL', bPLen.length, bPLen.toString('hex'),
'payload', bPayload.length, bPayload.toString('hex'),
'full len', r.length //, r.toString('hex')
);
return r;
};
export const serializeMessages = (messages: CreateMessage[]) =>
Buffer.concat(messages.map(c => serializeMessage(c)));
export const serializeSendMessages = (
streamId: Id,
topicId: Id,
messages: CreateMessage[],
partitioning?: Partitioning,
) => {
const streamIdentifier = serializeIdentifier(streamId);
const topicIdentifier = serializeIdentifier(topicId);
const bPartitioning = serializePartitioning(partitioning);
const bMessages = serializeMessages(messages);
return Buffer.concat([
streamIdentifier,
topicIdentifier,
bPartitioning,
bMessages
]);
};