in hf_xet/src/log_buffer.rs [199:244]
fn on_event(&self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
let mut buffer = self.buffer.lock().unwrap();
let mut http_headers = HeaderMap::new();
{
let mut user_agent = self.telemetry_versions.clone();
let mut visitor = |field: &tracing::field::Field, value: &dyn std::fmt::Debug| {
user_agent.push_str(&format!("{}/{:?}; ", field.name(), value));
};
event.record(&mut visitor);
user_agent = user_agent.replace("\"", "");
if let Ok(header_value) = HeaderValue::from_str(&user_agent) {
http_headers.insert("User-Agent", header_value);
} else {
self.stats.records_refused.fetch_add(1, Ordering::Relaxed);
return;
}
}
let serializable: SerializableHeaders = (&http_headers).into();
if let Ok(serialized_headers) = serde_json::to_string(&serializable) {
if let Ok(reserved) = buffer.reserve(serialized_headers.len()) {
if reserved.len() < serialized_headers.len() {
// log goes to /dev/null if not enough free space
self.stats.records_refused.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_refused
.fetch_add(serialized_headers.len() as u64, Ordering::Relaxed);
buffer.commit(0);
} else {
self.stats.records_written.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_written
.fetch_add(serialized_headers.len() as u64, Ordering::Relaxed);
reserved[..serialized_headers.len()].copy_from_slice(serialized_headers.as_bytes());
buffer.commit(serialized_headers.len());
}
} else {
self.stats.records_refused.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_refused
.fetch_add(serialized_headers.len() as u64, Ordering::Relaxed);
}
} else {
self.stats.records_refused.fetch_add(1, Ordering::Relaxed);
}
}