in http/get_simple/rs/client/src/main.rs [26:106]
fn main() {
// Configure tracing subscriber.
tracing_subscriber::fmt()
.with_span_events(FmtSpan::CLOSE)
.init();
info_span!("get_simple").in_scope(|| {
// Connect to server.
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8008);
match TcpStream::connect(addr) {
Ok(mut stream) => {
info_span!("Reading Arrow IPC stream", %addr).in_scope(|| {
info!("Connected");
// Send request.
stream
.write_all(format!("GET / HTTP/1.1\r\nHost: {addr}\r\n\r\n").as_bytes())
.unwrap();
// Ignore response header.
let mut reader = BufReader::new(&mut stream);
let mut chunked = false;
loop {
let mut line = String::default();
reader.read_line(&mut line).unwrap();
if let Some(("transfer-encoding", "chunked")) = line
.to_lowercase()
.split_once(':')
.map(|(key, value)| (key.trim(), value.trim()))
{
chunked = true;
}
if line == "\r\n" {
break;
}
}
// Read Arrow IPC stream
let batches: Vec<_> = if chunked {
let mut buffer = Vec::default();
loop {
// Chunk size
let mut line = String::default();
reader.read_line(&mut line).unwrap();
let chunk_size = u64::from_str_radix(line.trim(), 16).unwrap();
if chunk_size == 0 {
// Terminating chunk
break;
} else {
// Append chunk to buffer
let mut chunk_reader = reader.take(chunk_size);
chunk_reader.read_to_end(&mut buffer).unwrap();
// Terminating CR-LF sequence
reader = chunk_reader.into_inner();
reader.read_line(&mut String::default()).unwrap();
}
}
StreamReader::try_new_unbuffered(buffer.as_slice(), None)
.unwrap()
.flat_map(Result::ok)
.collect()
} else {
StreamReader::try_new_unbuffered(reader, None)
.unwrap()
.flat_map(Result::ok)
.collect()
};
info!(
batches = batches.len(),
rows = batches.iter().map(|rb| rb.num_rows()).sum::<usize>()
);
});
}
Err(error) => {
error!(%error, "Connection failed")
}
}
})
}