sdk/servicebus/service-bus/src/util/utils.ts (443 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import Long from "long";
import type { ServiceBusLogger } from "../log.js";
import { logger, receiverLogger, messageLogger } from "../log.js";
import type { AmqpError } from "rhea-promise";
import { OperationTimeoutError, generate_uuid } from "rhea-promise";
import isBuffer from "is-buffer";
import * as Constants from "../util/constants.js";
import type { AbortSignalLike } from "@azure/abort-controller";
import { AbortError } from "@azure/abort-controller";
import type { PipelineResponse } from "@azure/core-rest-pipeline";
import { isDefined } from "@azure/core-util";
import type { HttpResponse } from "./compat/index.js";
import { toHttpResponse } from "./compat/index.js";
import { ErrorNameConditionMapper, StandardAbortMessage, delay } from "@azure/core-amqp";
import { translateServiceBusError } from "../serviceBusError.js";
/**
* @internal
* Provides a uniue name by appending a string guid to the given string in the following format:
* `{name}-{uuid}`.
* @param name - The nme of the entity
*/
export function getUniqueName(name: string): string {
return `${name}-${generate_uuid()}`;
}
/**
* @internal
* Returns the passed identifier if it is not undefined or empty;
* otherwise generate and returns a unique one in the following format;
* `{prefix}-{uuid}`.
* @param prefix - The prefix used to generate identifier
* @param identifier - an identifier name
*/
export function ensureValidIdentifier(prefix: string, identifier?: string): string {
return identifier ? identifier : getUniqueName(prefix);
}
/**
* @internal
* If you try to turn a Guid into a Buffer in .NET, the bytes of the first three groups get
* flipped within the group, but the last two groups don't get flipped, so we end up with a
* different byte order. This is the order of bytes needed to make Service Bus recognize the token.
*
* @param lockToken - The lock token whose bytes need to be reorded.
* @returns Buffer representing reordered bytes.
*/
export function reorderLockToken(lockTokenBytes: Buffer): Buffer {
if (!lockTokenBytes || !Buffer.isBuffer(lockTokenBytes)) {
return lockTokenBytes;
}
return Buffer.from([
lockTokenBytes[3],
lockTokenBytes[2],
lockTokenBytes[1],
lockTokenBytes[0],
lockTokenBytes[5],
lockTokenBytes[4],
lockTokenBytes[7],
lockTokenBytes[6],
lockTokenBytes[8],
lockTokenBytes[9],
lockTokenBytes[10],
lockTokenBytes[11],
lockTokenBytes[12],
lockTokenBytes[13],
lockTokenBytes[14],
lockTokenBytes[15],
]);
}
/**
* @internal
* Provides the time in milliseconds after which the lock renewal should occur.
* @param lockedUntilUtc - The time until which the message is locked.
*/
export function calculateRenewAfterDuration(lockedUntilUtc: Date): number {
const now = Date.now();
const lockedUntil = lockedUntilUtc.getTime();
const remainingTime = lockedUntil - now;
receiverLogger.verbose("Locked until utc : %d", lockedUntil);
receiverLogger.verbose("Current time is : %d", now);
receiverLogger.verbose("Remaining time is : %d", remainingTime);
if (remainingTime < 1000) {
return 0;
}
const buffer = Math.min(remainingTime / 2, 10000); // 10 seconds
const renewAfter = remainingTime - buffer;
receiverLogger.verbose("Renew after : %d", renewAfter);
return renewAfter;
}
/**
* @internal
* Converts the .net ticks to a JS Date object.
*
* - The epoch for the DateTimeOffset type is `0000-01-01`, while the epoch for JS Dates is
* `1970-01-01`.
* - The DateTimeOffset ticks value for the date `1970-01-01` is `621355968000000000`.
* - Hence, to convert it to the JS epoch; we `subtract` the delta from the given value.
* - Ticks in DateTimeOffset is `1/10000000` second, while ticks in JS Date is `1/1000` second.
* - Thus, we `divide` the value by `10000` to convert it to JS Date ticks.
*
* @param buf - Input as a Buffer
* @returns The JS Date object.
*/
export function convertTicksToDate(buf: number[]): Date {
const epochMicroDiff: number = 621355968000000000;
const longValue: Long = Long.fromBytesBE(buf);
const timeInMS = longValue.sub(epochMicroDiff).div(10000).toNumber();
const result = new Date(timeInMS);
logger.verbose("The converted date is: %s", result.toString());
return result;
}
/**
* @internal
* Converts any given input to a Buffer.
* @param input - The input that needs to be converted to a Buffer.
*/
export function toBuffer(input: unknown): Buffer {
let result: any;
messageLogger.verbose(
"[utils.toBuffer] The given message body that needs to be converted to buffer is: ",
input,
);
if (isBuffer(input)) {
result = input;
} else {
// string, undefined, null, boolean, array, object, number should end up here
// coercing undefined to null as that will ensure that null value will be given to the
// customer on receive.
if (input === undefined) input = null;
try {
const inputStr = JSON.stringify(input);
result = Buffer.from(inputStr, "utf8");
} catch (err: any) {
const msg =
`An error occurred while executing JSON.stringify() on the given input ` +
input +
`${err instanceof Error ? err.stack : JSON.stringify(err)}`;
messageLogger.warning("[utils.toBuffer] " + msg);
throw err instanceof Error ? err : new Error(msg);
}
}
messageLogger.verbose("[utils.toBuffer] The converted buffer is: %O.", result);
return result;
}
/**
* @internal
* Helper utility to retrieve `string` value from given string,
* or throws error if undefined.
*/
export function getString(value: unknown, nameOfProperty: string): string {
const result = getStringOrUndefined(value);
if (result === undefined) {
throw new Error(
`"${nameOfProperty}" received from service expected to be a string value and not undefined.`,
);
}
return result;
}
/**
* @internal
* Helper utility to retrieve `string` value from given input,
* or undefined if not passed in.
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function getStringOrUndefined(value: any): string | undefined {
if (!isDefined(value)) {
return undefined;
}
return value.toString();
}
/**
* @internal
* Helper utility to retrieve `integer` value from given string,
* or throws error if undefined.
*/
export function getInteger(value: unknown, nameOfProperty: string): number {
const result = getIntegerOrUndefined(value);
if (result === undefined) {
throw new Error(
`"${nameOfProperty}" received from service expected to be a number value and not undefined.`,
);
}
return result;
}
/**
* @internal
* Helper utility to retrieve `integer` value from given string,
* or undefined if not passed in.
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function getIntegerOrUndefined(value: any): number | undefined {
if (!isDefined(value)) {
return undefined;
}
const result = parseInt(value.toString());
return isNaN(result) ? undefined : result;
}
/**
* @internal
* Helper utility to convert ISO-8601 time into Date type.
*/
export function getDate(value: string, nameOfProperty: string): Date {
return new Date(getString(value, nameOfProperty));
}
/**
* @internal
* Helper utility to retrieve `boolean` value from given string,
* or throws error if undefined.
*/
export function getBoolean(value: unknown, nameOfProperty: string): boolean {
const result = getBooleanOrUndefined(value);
if (result === undefined) {
throw new Error(
`"${nameOfProperty}" received from service expected to be a boolean value and not undefined.`,
);
}
return result;
}
/**
* @internal
* Helper utility to retrieve `boolean` value from given string,
* or undefined if not passed in.
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function getBooleanOrUndefined(value: any): boolean | undefined {
if (!isDefined(value)) {
return undefined;
}
return value.toString().trim().toLowerCase() === "true";
}
/**
* @internal
* Helps in differentiating JSON like objects from other kinds of objects.
*/
const EMPTY_JSON_OBJECT_CONSTRUCTOR = {}.constructor;
/**
* @internal
* Returns `true` if given input is a JSON like object.
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function isJSONLikeObject(value: any): boolean {
// `value.constructor === {}.constructor` differentiates among the "object"s,
// would filter the JSON objects and won't match any array or other kinds of objects
// -------------------------------------------------------------------------------
// Few examples | typeof obj ==="object" | obj.constructor==={}.constructor
// -------------------------------------------------------------------------------
// {abc:1} | true | true
// ["a","b"] | true | false
// [{"a":1},{"b":2}] | true | false
// new Date() | true | false
// 123 | false | false
// -------------------------------------------------------------------------------
return typeof value === "object" && value.constructor === EMPTY_JSON_OBJECT_CONSTRUCTOR;
}
/**
* @internal
* Helper utility to retrieve message count details from given input,
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function getMessageCountDetails(value: any): MessageCountDetails {
const xmlnsPrefix = getXMLNSPrefix(value);
if (!isDefined(value)) {
value = {};
}
return {
activeMessageCount: parseInt(value[`${xmlnsPrefix}:ActiveMessageCount`]) || 0,
deadLetterMessageCount: parseInt(value[`${xmlnsPrefix}:DeadLetterMessageCount`]) || 0,
scheduledMessageCount: parseInt(value[`${xmlnsPrefix}:ScheduledMessageCount`]) || 0,
transferMessageCount: parseInt(value[`${xmlnsPrefix}:TransferMessageCount`]) || 0,
transferDeadLetterMessageCount:
parseInt(value[`${xmlnsPrefix}:TransferDeadLetterMessageCount`]) || 0,
};
}
/**
* @internal
* Gets the xmlns prefix from the root of the objects that are part of the parsed response body.
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function getXMLNSPrefix(value: any): string {
if (!value[Constants.XML_METADATA_MARKER]) {
throw new Error(
`Error occurred while parsing the response body - cannot find the XML_METADATA_MARKER "$" on the object ${JSON.stringify(
value,
)}`,
);
}
const keys = Object.keys(value[Constants.XML_METADATA_MARKER]);
if (keys.length !== 1) {
throw new Error(
`Error occurred while parsing the response body - unexpected number of "xmlns:\${prefix}" keys at ${JSON.stringify(
value[Constants.XML_METADATA_MARKER],
)}`,
);
}
if (!keys[0].startsWith("xmlns:")) {
throw new Error(
`Error occurred while parsing the response body - unexpected key at ${JSON.stringify(
value[Constants.XML_METADATA_MARKER],
)}`,
);
}
// Pick the substring that's after "xmlns:"
const xmlnsPrefix = keys[0].substring(6);
if (!xmlnsPrefix) {
throw new Error(
`Error occurred while parsing the response body - unexpected xmlns prefix at ${JSON.stringify(
value[Constants.XML_METADATA_MARKER],
)}`,
);
}
return xmlnsPrefix;
}
/**
* Represents type of message count details in ATOM based management operations.
* @internal
*/
export type MessageCountDetails = {
activeMessageCount: number;
deadLetterMessageCount: number;
scheduledMessageCount: number;
transferMessageCount: number;
transferDeadLetterMessageCount: number;
};
/**
* Represents type of `AuthorizationRule` in ATOM based management operations.
*/
export interface AuthorizationRule {
/**
* The claim type.
*/
claimType: string;
/**
* The list of rights("Manage" | "Send" | "Listen").
*/
accessRights?: ("Manage" | "Send" | "Listen")[];
/**
* The authorization rule key name.
*/
keyName: string;
/**
* The primary key for the authorization rule.
*/
primaryKey?: string;
/**
* The secondary key for the authorization rule.
*/
secondaryKey?: string;
}
/**
* @internal
* Helper utility to retrieve array of `AuthorizationRule` from given input,
* or undefined if not passed in.
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function getAuthorizationRulesOrUndefined(value: any): AuthorizationRule[] | undefined {
const authorizationRules: AuthorizationRule[] = [];
// Ignore special case as Service Bus treats "" as a valid value for authorization rules
if (typeof value === "string" && value.trim() === "") {
return undefined;
}
if (!isDefined(value)) {
return undefined;
}
const rawAuthorizationRules = value.AuthorizationRule;
if (Array.isArray(rawAuthorizationRules)) {
for (let i = 0; i < rawAuthorizationRules.length; i++) {
authorizationRules.push(buildAuthorizationRule(rawAuthorizationRules[i]));
}
} else {
authorizationRules.push(buildAuthorizationRule(rawAuthorizationRules));
}
return authorizationRules;
}
/**
* @internal
* Helper utility to build an instance of parsed authorization rule as `AuthorizationRule` from given input.
*/
function buildAuthorizationRule(value: any): AuthorizationRule {
let accessRights;
if (isDefined(value["Rights"])) {
accessRights = value["Rights"]["AccessRights"];
}
const authorizationRule: AuthorizationRule = {
claimType: value["ClaimType"],
accessRights,
keyName: value["KeyName"],
primaryKey: value["PrimaryKey"],
secondaryKey: value["SecondaryKey"],
};
if (authorizationRule.accessRights && !Array.isArray(authorizationRule.accessRights)) {
authorizationRule.accessRights = [authorizationRule.accessRights];
}
return authorizationRule;
}
/**
* @internal
* Helper utility to extract output containing array of `RawAuthorizationRule` instances from given input,
* or undefined if not passed in.
*/
export function getRawAuthorizationRules(authorizationRules: AuthorizationRule[] | undefined): any {
if (!isDefined(authorizationRules)) {
return undefined;
}
if (!Array.isArray(authorizationRules)) {
throw new TypeError(
`authorizationRules must be an array of AuthorizationRule objects or undefined, but received ${JSON.stringify(
authorizationRules,
undefined,
2,
)}`,
);
}
const rawAuthorizationRules: any[] = [];
for (let i = 0; i < authorizationRules.length; i++) {
rawAuthorizationRules.push(buildRawAuthorizationRule(authorizationRules[i]));
}
return { AuthorizationRule: rawAuthorizationRules };
}
/**
* @internal
* Helper utility to build an instance of raw authorization rule as RawAuthorizationRule from given `AuthorizationRule` input.
* @param authorizationRule - parsed Authorization Rule instance
*/
function buildRawAuthorizationRule(authorizationRule: AuthorizationRule): any {
if (!isJSONLikeObject(authorizationRule) || authorizationRule === null) {
throw new TypeError(
`Expected authorizationRule input to be a JS object value, but received ${JSON.stringify(
authorizationRule,
undefined,
2,
)}`,
);
}
const rawAuthorizationRule: any = {
ClaimType: authorizationRule.claimType,
// ClaimValue is not settable by the users, but service expects the value for PUT requests
ClaimValue: "None",
Rights: {
AccessRights: authorizationRule.accessRights,
},
KeyName: authorizationRule.keyName,
PrimaryKey: authorizationRule.primaryKey,
SecondaryKey: authorizationRule.secondaryKey,
};
rawAuthorizationRule[Constants.XML_METADATA_MARKER] = {
"p5:type": "SharedAccessAuthorizationRule",
"xmlns:p5": "http://www.w3.org/2001/XMLSchema-instance",
};
return rawAuthorizationRule;
}
/**
* @internal
* Helper utility to check if given string is an absolute URL
*/
export function isAbsoluteUrl(url: string): boolean {
const _url = url.toLowerCase();
return _url.startsWith("sb://") || _url.startsWith("http://") || _url.startsWith("https://");
}
/**
* Possible values for `status` of the Service Bus messaging entities.
*/
export type EntityStatus =
| "Active"
| "Creating"
| "Deleting"
| "ReceiveDisabled"
| "SendDisabled"
| "Disabled"
| "Renaming"
| "Restoring"
| "Unknown";
/**
* Possible values for `availabilityStatus` of the Service Bus messaging entities.
*/
export type EntityAvailabilityStatus =
| "Available"
| "Limited"
| "Renaming"
| "Restoring"
| "Unknown";
/**
* @internal
*/
type setTimeoutArgs = (callback: (...args: any[]) => void, ms: number, ...args: any[]) => any;
/**
* An executor for a function that returns a Promise that obeys both a timeout and an
* optional AbortSignal.
* @param timeoutMs - The number of milliseconds to allow before throwing an OperationTimeoutError.
* @param timeoutMessage - The message to place in the .description field for the thrown exception for Timeout.
* @param abortSignal - The abortSignal associated with containing operation.
* @param abortErrorMsg - The abort error message associated with containing operation.
* @param value - The value to be resolved with after a timeout of t milliseconds.
*
* @internal
*/
export async function waitForTimeoutOrAbortOrResolve<T>(args: {
actionFn: () => Promise<T>;
timeoutMs: number;
timeoutMessage: string;
abortSignal?: AbortSignalLike;
// these are optional and only here for testing.
timeoutFunctions?: {
setTimeoutFn: setTimeoutArgs;
clearTimeoutFn: (timeoutId: any) => void;
};
}): Promise<T> {
if (args.abortSignal && args.abortSignal.aborted) {
throw new AbortError(StandardAbortMessage);
}
let timer: any | undefined = undefined;
let clearAbortSignal: (() => void) | undefined = undefined;
const clearAbortSignalAndTimer = (): void => {
(args.timeoutFunctions?.clearTimeoutFn ?? clearTimeout)(timer);
if (clearAbortSignal) {
clearAbortSignal();
}
};
const abortOrTimeoutPromise = new Promise<T>((_resolve, reject) => {
clearAbortSignal = checkAndRegisterWithAbortSignal(reject, args.abortSignal);
timer = (args.timeoutFunctions?.setTimeoutFn ?? setTimeout)(() => {
reject(new OperationTimeoutError(args.timeoutMessage));
}, args.timeoutMs);
});
try {
return await Promise.race([abortOrTimeoutPromise, args.actionFn()]);
} finally {
clearAbortSignalAndTimer();
}
}
/**
* Registers listener to the abort event on the abortSignal to call your abortFn and
* returns a function that will clear the same listener.
*
* If abort signal is already aborted, then throws an AbortError and returns a function that does nothing
*
* @returns A function that removes any of our attached event listeners on the abort signal or an empty function if
* the abortSignal was not defined.
*
* @internal
*/
export function checkAndRegisterWithAbortSignal(
onAbortFn: (abortError: AbortError) => void,
abortSignal?: AbortSignalLike,
): () => void {
if (abortSignal == null) {
return () => {
/** Nothing to do here, no abort signal */
};
}
if (abortSignal.aborted) {
throw new AbortError(StandardAbortMessage);
}
const onAbort = (): void => {
abortSignal.removeEventListener("abort", onAbort);
onAbortFn(new AbortError(StandardAbortMessage));
};
abortSignal.addEventListener("abort", onAbort);
return () => abortSignal.removeEventListener("abort", onAbort);
}
/**
* @internal
* The user agent prefix string for the ServiceBus client.
* See guideline at https://azure.github.io/azure-sdk/general_azurecore.html#telemetry-policy
*/
export const libInfo: string = `azsdk-js-azureservicebus/${Constants.packageJsonInfo.version}`;
/**
* @internal
* Returns the formatted prefix by removing the spaces, by appending the libInfo.
*/
export function formatUserAgentPrefix(prefix?: string): string {
let userAgentPrefix = `${(prefix || "").replace(" ", "")}`;
userAgentPrefix = userAgentPrefix.length > 0 ? userAgentPrefix + " " : "";
return `${userAgentPrefix}${libInfo}`;
}
/**
* @internal
* Helper method which returns `HttpResponse` from an object of shape `PipelineResponse`.
* TODO: remove this and use toHttpResponse() directly
*/
export const getHttpResponseOnly = (pipelineResponse: PipelineResponse): HttpResponse =>
toHttpResponse(pipelineResponse);
/**
* @internal
* Type with the service versions for the ATOM API.
*/
export type ServiceBusAtomAPIVersion = "2021-05" | "2017-04";
/**
* @internal
* Waits for one second if a sender is not sendable then check again. Throws
* SenderBusyError if it is still not sendable.
* Only waits when operation timeout is greater than one second.
* @returns the actual waiting time.
*/
export async function waitForSendable(
sendLogger: ServiceBusLogger,
logPrefix: string,
name: string,
timeout: number,
sender:
| {
sendable: () => boolean;
credit: number;
}
| undefined,
outgoingAvaiable: number,
): Promise<number> {
let waitTimeForSendable = 1000;
if (!sender?.sendable() && timeout > waitTimeForSendable) {
sendLogger.verbose(
"%s Sender '%s', waiting for 1 second for sender to become sendable",
logPrefix,
name,
);
await delay(waitTimeForSendable);
sendLogger.verbose(
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
logPrefix,
name,
sender?.credit,
outgoingAvaiable,
);
} else {
waitTimeForSendable = 0;
}
if (!sender?.sendable()) {
// let us retry to send the message after some time.
const msg =
`[${logPrefix}] Sender "${name}", ` + `cannot send the message right now. Please try later.`;
sendLogger.warning(msg);
const amqpError: AmqpError = {
condition: ErrorNameConditionMapper.SenderBusyError,
description: msg,
};
throw translateServiceBusError(amqpError);
}
return waitTimeForSendable;
}