packages/azure-kusto-ingest/src/statusQ.ts (122 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import { PeekedMessageItem, QueueClient } from "@azure/storage-queue";
import { ResourceURI } from "./resourceManager.js";
import { StatusMessage } from "./status.js";
class QueueDetails {
constructor(
readonly name: string,
readonly service: QueueClient,
) {}
}
const shuffle = <T>(a: T[]): T[] => {
for (let i = a.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
const temp = a[j];
a[j] = a[i];
a[i] = temp;
}
return a;
};
interface PeekParams {
raw: boolean;
}
interface PopParams {
raw: boolean;
remove: boolean;
}
type Message = PeekedMessageItem | StatusMessage;
export class StatusQueue {
constructor(
readonly getQueuesFunc: () => Promise<ResourceURI[]>,
readonly messageCls: typeof StatusMessage,
) {}
_getQServices(queuesDetails: ResourceURI[]) {
return queuesDetails.map((q) => {
const fullUri = q.uri;
if (!fullUri) {
throw new Error("Empty or null connection string");
}
// chop off sas
const indexOfSas = q.uri.indexOf("?");
const name = indexOfSas > 0 ? q.uri.substring(0, indexOfSas) : q.uri;
return new QueueDetails(name, new QueueClient(fullUri));
});
}
async isEmpty() {
const result = await this.peek(1, { raw: true });
return !result || result.length === 0;
}
decodeContent(content: string) {
return Buffer.from(content, "base64").toString("ascii");
}
deserializeMessage(m: PeekedMessageItem): StatusMessage {
return new this.messageCls(this.decodeContent(m.messageText), null, null);
}
async _peek(qs: QueueDetails[], n: number, options: PeekParams | null): Promise<{ result: Message[]; nonEmptyQs: QueueDetails[]; done: boolean }> {
const result: Message[] = [];
const nonEmptyQs: QueueDetails[] = [];
for (const q of qs) {
const response = await q.service.peekMessages();
const messages = response.peekedMessageItems;
if (messages && messages.length > 0) {
nonEmptyQs.push(q);
}
for (const m of messages) {
if (m && Object.keys(m).length > 0) {
result.push(options && options.raw ? m : this.deserializeMessage(m));
if (result.length === n) {
return { done: true, nonEmptyQs, result };
}
}
}
}
return { done: nonEmptyQs.length === 0, nonEmptyQs, result };
}
async peek(n = 1, options: PeekParams | null = null): Promise<Message[]> {
const queues = await this.getQueuesFunc();
const qServices: QueueDetails[] = shuffle(this._getQServices(queues));
const perQ = qServices.length > 1 ? Math.floor(n / qServices.length) : qServices.length;
// First, iterate evenly and randomly on status queues
const partial = await this._peek(qServices, perQ, options);
if (partial.done) {
return partial.result;
}
const messagesLeftToPeek = n - partial.result.length;
// In case queues are uneven, iterate again. This time, request for all n messages and trim
return (await this._peek(partial.nonEmptyQs, messagesLeftToPeek, options)).result;
}
async _pop(
qs: QueueDetails[],
n: number,
options: PopParams | null,
): Promise<{ result: Message[] & { nonEmptyQs?: QueueDetails[] }; nonEmptyQs: any[]; done: boolean }> {
const nonEmptyQs: any[] = [];
const result = [];
for (const q of qs) {
const response = await q.service.receiveMessages({ numberOfMessages: n });
const messages = response.receivedMessageItems;
for (const m of messages) {
if (m && Object.keys(m).length > 0) {
result.push(options && options.raw ? m : this.deserializeMessage(m));
if (!(options && !options.remove)) {
await q.service.deleteMessage(m.messageId, m.popReceipt);
}
if (result.length === n) {
return { done: true, nonEmptyQs, result };
}
}
}
}
return { done: nonEmptyQs.length === 0, nonEmptyQs, result };
}
async pop(n = 1, options: PopParams | null = null): Promise<Message[]> {
const queues = await this.getQueuesFunc();
const qServices = shuffle(this._getQServices(queues));
const perQ = qServices.length > 1 ? Math.floor(n / qServices.length) : qServices.length;
// First, iterate evenly and randomly on status queues
const partial = await this._pop(qServices, perQ, options);
if (partial.done) {
return partial.result;
}
const messagesLeftToPop = n - partial.result.length;
// In case queues are uneven, iterate again. This time, request for all n messages and trim
const final = await this._pop(partial.result.nonEmptyQs ?? [], messagesLeftToPop, options);
return partial.result.concat(final.result);
}
}