in packages/azure-kusto-ingest/src/managedStreamingIngestClient.ts [148:190]
async streamWithRetries(
length: number,
descriptor: AbstractDescriptor,
props?: IngestionPropertiesInput,
clientRequestId?: string,
stream?: Readable | ArrayBuffer,
): Promise<any> {
const isBlob = descriptor instanceof BlobDescriptor;
if (length <= maxStreamSize) {
// If we get buffer that means it was less than the max size, so we can do streamingIngestion
const retry = new ExponentialRetry(attemptCount, this.baseSleepTimeSecs, this.baseJitterSecs);
while (retry.shouldTry()) {
try {
const sourceId =
clientRequestId ??
`KNC.executeManagedStreamingIngest${isBlob ? "FromBlob" : "FromStream"};${descriptor.sourceId};${retry.currentAttempt}`;
if (isBlob) {
return await this.streamingIngestClient.ingestFromBlob(descriptor as BlobDescriptor, props, sourceId);
}
if (isNode) {
return await this.streamingIngestClient.ingestFromStream(
new StreamDescriptor(readableToStream(stream!)).merge(descriptor as StreamDescriptor),
props,
sourceId,
);
}
return await this.streamingIngestClient.ingestFromStream(descriptor as StreamDescriptor, props, sourceId);
} catch (err: unknown) {
const oneApiError = err as { "@permanent"?: boolean };
if (oneApiError["@permanent"]) {
throw err;
}
await retry.backoff();
}
}
stream = isBlob ? undefined : isNode && stream ? readableToStream(stream) : (descriptor as StreamDescriptor).stream;
}
return null;
}