in source/connection_monitor.c [18:161]
static void s_process_statistics(
struct aws_crt_statistics_handler *handler,
struct aws_crt_statistics_sample_interval *interval,
struct aws_array_list *stats_list,
void *context) {
(void)interval;
struct aws_statistics_handler_http_connection_monitor_impl *impl = handler->impl;
if (!aws_http_connection_monitoring_options_is_valid(&impl->options)) {
return;
}
uint64_t pending_read_interval_ms = 0;
uint64_t pending_write_interval_ms = 0;
uint64_t bytes_read = 0;
uint64_t bytes_written = 0;
uint32_t current_outgoing_stream_id = 0;
uint32_t current_incoming_stream_id = 0;
/*
* Pull out the data needed to perform the throughput calculation
*/
size_t stats_count = aws_array_list_length(stats_list);
for (size_t i = 0; i < stats_count; ++i) {
struct aws_crt_statistics_base *stats_base = NULL;
if (aws_array_list_get_at(stats_list, &stats_base, i)) {
continue;
}
switch (stats_base->category) {
case AWSCRT_STAT_CAT_SOCKET: {
struct aws_crt_statistics_socket *socket_stats = (struct aws_crt_statistics_socket *)stats_base;
bytes_read = socket_stats->bytes_read;
bytes_written = socket_stats->bytes_written;
break;
}
case AWSCRT_STAT_CAT_HTTP1_CHANNEL: {
struct aws_crt_statistics_http1_channel *http1_stats =
(struct aws_crt_statistics_http1_channel *)stats_base;
pending_read_interval_ms = http1_stats->pending_incoming_stream_ms;
pending_write_interval_ms = http1_stats->pending_outgoing_stream_ms;
current_outgoing_stream_id = http1_stats->current_outgoing_stream_id;
current_incoming_stream_id = http1_stats->current_incoming_stream_id;
break;
}
default:
break;
}
}
if (impl->options.statistics_observer_fn) {
impl->options.statistics_observer_fn(
(size_t)(uintptr_t)(context), stats_list, impl->options.statistics_observer_user_data);
}
struct aws_channel *channel = context;
uint64_t bytes_per_second = 0;
uint64_t max_pending_io_interval_ms = 0;
if (pending_write_interval_ms > 0) {
double fractional_bytes_written_per_second =
(double)bytes_written * (double)AWS_TIMESTAMP_MILLIS / (double)pending_write_interval_ms;
if (fractional_bytes_written_per_second >= (double)UINT64_MAX) {
bytes_per_second = UINT64_MAX;
} else {
bytes_per_second = (uint64_t)fractional_bytes_written_per_second;
}
max_pending_io_interval_ms = pending_write_interval_ms;
}
if (pending_read_interval_ms > 0) {
double fractional_bytes_read_per_second =
(double)bytes_read * (double)AWS_TIMESTAMP_MILLIS / (double)pending_read_interval_ms;
if (fractional_bytes_read_per_second >= (double)UINT64_MAX) {
bytes_per_second = UINT64_MAX;
} else {
bytes_per_second = aws_add_u64_saturating(bytes_per_second, (uint64_t)fractional_bytes_read_per_second);
}
if (pending_read_interval_ms > max_pending_io_interval_ms) {
max_pending_io_interval_ms = pending_read_interval_ms;
}
}
AWS_LOGF_DEBUG(
AWS_LS_IO_CHANNEL,
"id=%p: channel throughput - %" PRIu64 " bytes per second",
(void *)channel,
bytes_per_second);
/*
* Check throughput only if at least one stream exists and was observed in that role previously
*
* ToDo: This logic only makes sense from an h1 perspective. A similar requirement could be placed on
* h2 stats by analyzing/tracking the min and max stream ids (per odd/even) at process timepoints.
*/
bool check_throughput =
(current_incoming_stream_id != 0 && current_incoming_stream_id == impl->last_incoming_stream_id) ||
(current_outgoing_stream_id != 0 && current_outgoing_stream_id == impl->last_outgoing_stream_id);
impl->last_outgoing_stream_id = current_outgoing_stream_id;
impl->last_incoming_stream_id = current_incoming_stream_id;
impl->last_measured_throughput = bytes_per_second;
if (!check_throughput) {
AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: channel throughput does not need to be checked", (void *)channel);
impl->throughput_failure_time_ms = 0;
return;
}
if (bytes_per_second >= impl->options.minimum_throughput_bytes_per_second) {
impl->throughput_failure_time_ms = 0;
return;
}
impl->throughput_failure_time_ms =
aws_add_u64_saturating(impl->throughput_failure_time_ms, max_pending_io_interval_ms);
AWS_LOGF_INFO(
AWS_LS_IO_CHANNEL,
"id=%p: Channel low throughput warning. Currently %" PRIu64 " milliseconds of consecutive failure time",
(void *)channel,
impl->throughput_failure_time_ms);
uint64_t maximum_failure_time_ms = aws_timestamp_convert(
impl->options.allowable_throughput_failure_interval_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_MILLIS, NULL);
if (impl->throughput_failure_time_ms <= maximum_failure_time_ms) {
return;
}
AWS_LOGF_INFO(
AWS_LS_IO_CHANNEL,
"id=%p: Channel low throughput threshold exceeded (< %" PRIu64
" bytes per second for more than %u seconds). Shutting down.",
(void *)channel,
impl->options.minimum_throughput_bytes_per_second,
impl->options.allowable_throughput_failure_interval_seconds);
aws_channel_shutdown(channel, AWS_ERROR_HTTP_CHANNEL_THROUGHPUT_FAILURE);
}