in Library/Sender.ts [102:307]
public async send(envelopes: Contracts.EnvelopeTelemetry[], callback?: (v: string) => void) {
if (envelopes) {
var endpointUrl = this._redirectedHost || this._config.endpointUrl;
var endpointHost = new URL(endpointUrl).hostname;
// todo: investigate specifying an agent here: https://nodejs.org/api/http.html#http_class_http_agent
var options = {
method: "POST",
withCredentials: false,
headers: <{ [key: string]: string }>{
"Content-Type": "application/x-json-stream"
}
};
let authHandler = this._getAuthorizationHandler ? this._getAuthorizationHandler(this._config) : null;
if (authHandler) {
if (this._statsbeat) {
this._statsbeat.addFeature(Constants.StatsbeatFeature.AAD_HANDLING);
}
try {
// Add bearer token
await authHandler.addAuthorizationHeader(options);
}
catch (authError) {
let errorMsg = "Failed to get AAD bearer token for the Application.";
if (this._enableDiskRetryMode) {
errorMsg += "This batch of telemetry items will be retried. ";
this._storeToDisk(envelopes);
}
errorMsg += "Error:" + authError.toString();
Logging.warn(Sender.TAG, errorMsg);
if (typeof callback === "function") {
callback(errorMsg);
}
return; // If AAD auth fails do not send to Breeze
}
}
let batch: string = "";
envelopes.forEach(envelope => {
var payload: string = Util.stringify(envelope);
if (typeof payload !== "string") {
return;
}
batch += payload + "\n";
});
// Remove last \n
if (batch.length > 0) {
batch = batch.substring(0, batch.length - 1);
}
let payload: Buffer = Buffer.from ? Buffer.from(batch) : new Buffer(batch);
zlib.gzip(payload, (err, buffer) => {
var dataToSend = buffer;
if (err) {
Logging.warn(Sender.TAG, err);
dataToSend = payload; // something went wrong so send without gzip
options.headers["Content-Length"] = payload.length.toString();
} else {
options.headers["Content-Encoding"] = "gzip";
options.headers["Content-Length"] = buffer.length.toString();
}
Logging.info(Sender.TAG, options);
// Ensure this request is not captured by auto-collection.
(<any>options)[AutoCollectHttpDependencies.disableCollectionRequestOption] = true;
let startTime = +new Date();
var requestCallback = (res: http.ClientResponse) => {
res.setEncoding("utf-8");
//returns empty if the data is accepted
var responseString = "";
res.on("data", (data: string) => {
responseString += data;
});
res.on("end", () => {
let endTime = +new Date();
let duration = endTime - startTime;
this._numConsecutiveFailures = 0;
if (this._enableDiskRetryMode) {
// try to send any cached events if the user is back online
if (res.statusCode === 200) {
if (!this._resendTimer) {
this._resendTimer = setTimeout(() => {
this._resendTimer = null;
this._sendFirstFileOnDisk()
}, this._resendInterval);
this._resendTimer.unref();
}
} else if (this._isRetriable(res.statusCode)) {
try {
if (this._statsbeat) {
this._statsbeat.countRetry(Constants.StatsbeatNetworkCategory.Breeze, endpointHost);
if (res.statusCode === 429) {
this._statsbeat.countThrottle(Constants.StatsbeatNetworkCategory.Breeze, endpointHost);
}
}
const breezeResponse = JSON.parse(responseString) as Contracts.BreezeResponse;
let filteredEnvelopes: Contracts.EnvelopeTelemetry[] = [];
if (breezeResponse.errors) {
breezeResponse.errors.forEach(error => {
if (this._isRetriable(error.statusCode)) {
filteredEnvelopes.push(envelopes[error.index]);
}
});
if (filteredEnvelopes.length > 0) {
this._storeToDisk(filteredEnvelopes);
}
}
}
catch (ex) {
this._storeToDisk(envelopes); // Retriable status code with not valid Breeze response
}
}
}
// Redirect handling
if (res.statusCode === 307 || // Temporary Redirect
res.statusCode === 308) { // Permanent Redirect
this._numConsecutiveRedirects++;
// To prevent circular redirects
if (this._numConsecutiveRedirects < 10) {
// Try to get redirect header
const locationHeader = res.headers["location"] ? res.headers["location"].toString() : null;
if (locationHeader) {
this._redirectedHost = locationHeader;
// Send to redirect endpoint as HTTPs library doesn't handle redirect automatically
this.send(envelopes, callback);
}
}
else {
if (this._statsbeat) {
this._statsbeat.countException(Constants.StatsbeatNetworkCategory.Breeze, endpointHost);
}
if (typeof callback === "function") {
callback("Error sending telemetry because of circular redirects.");
}
}
}
else {
if (this._statsbeat) {
this._statsbeat.countRequest(Constants.StatsbeatNetworkCategory.Breeze, endpointHost, duration, res.statusCode === 200);
}
this._numConsecutiveRedirects = 0;
if (typeof callback === "function") {
callback(responseString);
}
Logging.info(Sender.TAG, responseString);
if (typeof this._onSuccess === "function") {
this._onSuccess(responseString);
}
}
});
};
var req = Util.makeRequest(this._config, endpointUrl, options, requestCallback);
req.on("error", (error: Error) => {
// todo: handle error codes better (group to recoverable/non-recoverable and persist)
this._numConsecutiveFailures++;
if (this._statsbeat) {
this._statsbeat.countException(Constants.StatsbeatNetworkCategory.Breeze, endpointHost);
}
// Only use warn level if retries are disabled or we've had some number of consecutive failures sending data
// This is because warn level is printed in the console by default, and we don't want to be noisy for transient and self-recovering errors
// Continue informing on each failure if verbose logging is being used
if (!this._enableDiskRetryMode || this._numConsecutiveFailures > 0 && this._numConsecutiveFailures % Sender.MAX_CONNECTION_FAILURES_BEFORE_WARN === 0) {
let notice = "Ingestion endpoint could not be reached. This batch of telemetry items has been lost. Use Disk Retry Caching to enable resending of failed telemetry. Error:";
if (this._enableDiskRetryMode) {
notice = `Ingestion endpoint could not be reached ${this._numConsecutiveFailures} consecutive times. There may be resulting telemetry loss. Most recent error:`;
}
Logging.warn(Sender.TAG, notice, Util.dumpObj(error));
} else {
let notice = "Transient failure to reach ingestion endpoint. This batch of telemetry items will be retried. Error:";
Logging.info(Sender.TAG, notice, Util.dumpObj(error));
}
this._onErrorHelper(error);
if (typeof callback === "function") {
if (error) {
callback(Util.dumpObj(error));
}
else {
callback("Error sending telemetry");
}
}
if (this._enableDiskRetryMode) {
this._storeToDisk(envelopes);
}
});
req.write(dataToSend);
req.end();
});
}
}