diff --git a/fuzzers/config.sh b/fuzzers/config.sh index a362261956..a9d7841779 100755 --- a/fuzzers/config.sh +++ b/fuzzers/config.sh @@ -23,7 +23,7 @@ COVERAGE_LDFLAGS="-O1 -fno-omit-frame-pointer -g -ggdb3 -fprofile-instr-generate JANUS_CONF_FLAGS="--disable-docs --disable-post-processing --disable-turn-rest-api --disable-all-transports --disable-all-plugins --disable-all-handlers --disable-data-channels" # Janus objects needed for fuzzing -JANUS_OBJECTS="janus-log.o janus-utils.o janus-rtcp.o janus-rtp.o janus-sdp-utils.o" +JANUS_OBJECTS="janus-log.o janus-utils.o janus-rtcp.o janus-rtp.o janus-sdp-utils.o janus-bwe.o janus-ip-utils.o" # CFLAGS for fuzzer dependencies DEPS_CFLAGS="$(pkg-config --cflags glib-2.0)" diff --git a/fuzzers/rtcp_fuzzer.c b/fuzzers/rtcp_fuzzer.c index c742f1db93..dc9273714f 100644 --- a/fuzzers/rtcp_fuzzer.c +++ b/fuzzers/rtcp_fuzzer.c @@ -61,8 +61,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { janus_rtcp_cap_remb((char *)copy_data[idx++], size, 256000); janus_rtcp_swap_report_blocks((char *)copy_data[idx++], size, 2); janus_rtcp_fix_report_data((char *)copy_data[idx++], size, 2000, 1000, 2, 2, 2, TRUE); - janus_rtcp_fix_ssrc(&ctx0, (char *)copy_data[idx++], size, 1, 2, 2); - janus_rtcp_parse(&ctx1, (char *)copy_data[idx++], size); + janus_rtcp_fix_ssrc(&ctx0, NULL, (char *)copy_data[idx++], size, 1, 2, 2); + janus_rtcp_parse(&ctx1, NULL, (char *)copy_data[idx++], size); janus_rtcp_remove_nacks((char *)copy_data[idx++], size); /* Functions that allocate new memory */ char *output_data = janus_rtcp_filter((char *)data, size, &newlen); diff --git a/src/Makefile.am b/src/Makefile.am index 67e5dff5d6..a32e1423b6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -4,7 +4,7 @@ CLEANFILES = $(NULL) bin_PROGRAMS = janus headerdir = $(includedir)/janus -header_HEADERS = apierror.h config.h log.h debug.h mutex.h record.h \ +header_HEADERS = apierror.h config.h log.h debug.h mutex.h record.h bwe.h \ rtcp.h rtp.h rtpsrtp.h sdp-utils.h ip-utils.h utils.h refcount.h text2pcap.h pluginsheaderdir = $(includedir)/janus/plugins @@ -72,6 +72,8 @@ janus_SOURCES = \ apierror.h \ auth.c \ auth.h \ + bwe.c \ + bwe.h \ config.c \ config.h \ debug.h \ diff --git a/src/bwe.c b/src/bwe.c new file mode 100644 index 0000000000..4c95c8fbd8 --- /dev/null +++ b/src/bwe.c @@ -0,0 +1,456 @@ +/*! \file bwe.h + * \author Lorenzo Miniero + * \copyright GNU General Public License v3 + * \brief Bandwidth estimation tools + * \details Implementation of a basic bandwidth estimator for outgoing + * RTP flows, based on Transport Wide CC and a few other utilities. + * + * \ingroup protocols + * \ref protocols + */ + +#include +#include +#include +#include +#include + +#include "bwe.h" +#include "debug.h" +#include "utils.h" +#include "ip-utils.h" + + +const char *janus_bwe_twcc_status_description(janus_bwe_twcc_status status) { + switch(status) { + case janus_bwe_twcc_status_notreceived: + return "notreceived"; + case janus_bwe_twcc_status_smalldelta: + return "smalldelta"; + case janus_bwe_twcc_status_largeornegativedelta: + return "largeornegativedelta"; + case janus_bwe_twcc_status_reserved: + return "reserved"; + default: break; + } + return NULL; +} + +const char *janus_bwe_status_description(janus_bwe_status status) { + switch(status) { + case janus_bwe_status_start: + return "start"; + case janus_bwe_status_regular: + return "regular"; + case janus_bwe_status_lossy: + return "lossy"; + case janus_bwe_status_congested: + return "congested"; + case janus_bwe_status_recovering: + return "recovering"; + default: break; + } + return NULL; +} + +static void janus_bwe_twcc_inflight_destroy(janus_bwe_twcc_inflight *stat) { + g_free(stat); +} + +janus_bwe_context *janus_bwe_context_create(void) { + janus_bwe_context *bwe = g_malloc0(sizeof(janus_bwe_context)); + /* FIXME */ + bwe->packets = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_bwe_twcc_inflight_destroy); + bwe->sent = janus_bwe_stream_bitrate_create(); + bwe->acked = janus_bwe_stream_bitrate_create(); + bwe->delays = janus_bwe_delay_tracker_create(0); + bwe->probing_mindex = -1; + bwe->fd = -1; + return bwe; +} + +void janus_bwe_context_destroy(janus_bwe_context *bwe) { + if(bwe) { + /* FIXME clean everything up */ + g_hash_table_destroy(bwe->packets); + janus_bwe_stream_bitrate_destroy(bwe->sent); + janus_bwe_stream_bitrate_destroy(bwe->acked); + janus_bwe_delay_tracker_destroy(bwe->delays); + if(bwe->csv != NULL) + fclose(bwe->csv); + if(bwe->fd > -1) + close(bwe->fd); + g_free(bwe); + } +} + +gboolean janus_bwe_context_add_inflight(janus_bwe_context *bwe, + uint16_t seq, int64_t sent, janus_bwe_packet_type type, int size) { + if(bwe == NULL) + return FALSE; + int64_t now = janus_get_monotonic_time(); + if(bwe->started == 0) + bwe->started = now; + if(bwe->status == janus_bwe_status_start && now - bwe->started >= G_USEC_PER_SEC) { + /* Let's move from the starting phase to the regular stage */ + bwe->status = janus_bwe_status_regular; + bwe->status_changed = now; + bwe->last_notified = janus_get_monotonic_time(); + } + janus_bwe_twcc_inflight *stat = g_malloc(sizeof(janus_bwe_twcc_inflight)); + stat->seq = seq; + stat->sent_ts = sent; + stat->delta_us = bwe->last_sent_ts ? (sent - bwe->last_sent_ts) : 0; + bwe->last_sent_ts = sent; + stat->type = type; + stat->size = size; + janus_bwe_stream_bitrate_update(bwe->sent, now, type, 0, size); + g_hash_table_insert(bwe->packets, GUINT_TO_POINTER(seq), stat); + return TRUE; +} + +void janus_bwe_context_handle_feedback(janus_bwe_context *bwe, + uint16_t seq, janus_bwe_twcc_status status, int64_t delta_us, gboolean first) { + if(bwe == NULL) + return; + /* Find the inflight information we stored when sending this packet */ + janus_bwe_twcc_inflight *p = g_hash_table_lookup(bwe->packets, GUINT_TO_POINTER(seq)); + if(p == NULL) { + JANUS_LOG(LOG_WARN, "[BWE] [%"SCNu16"] not found in inflight packets table\n", seq); + return; + } + /* The first recv delta is relative to the reference time, not to the previous packet */ + if(!first) { + int64_t send_delta_us = 0; + if(seq == bwe->last_recv_seq + 1) { + send_delta_us = p->delta_us; + } else { + janus_bwe_twcc_inflight *prev_p = g_hash_table_lookup(bwe->packets, GUINT_TO_POINTER(bwe->last_recv_seq)); + if(prev_p != NULL) { + send_delta_us = p->sent_ts - prev_p->sent_ts; + } else { + JANUS_LOG(LOG_WARN, "[BWE] [%"SCNu16"] not found in inflight packets table\n", bwe->last_recv_seq); + } + } + int64_t rounded_delta_us = (send_delta_us / 250) * 250; + int64_t diff_us = delta_us - rounded_delta_us; + bwe->delay += diff_us; + JANUS_LOG(LOG_HUGE, "[BWE] [%"SCNu16"] %s (%"SCNi64"us) (send: %"SCNi64"us) diff_us=%"SCNi64"\n", seq, + janus_bwe_twcc_status_description(status), delta_us, rounded_delta_us, diff_us); + } + if(status != janus_bwe_twcc_status_notreceived) { + janus_bwe_stream_bitrate_update(bwe->acked, janus_get_monotonic_time(), p->type, 0, p->size); + bwe->received_pkts++; + bwe->last_recv_seq = seq; + } else { + bwe->lost_pkts++; + } +} + +void janus_bwe_context_update(janus_bwe_context *bwe) { + if(bwe == NULL) + return; + /* Reset the outgoing and (acked) incoming bitrate, and estimate the bitrate */ + int64_t now = janus_get_monotonic_time(); + if(bwe->bitrate_ts == 0) + bwe->bitrate_ts = now; + gboolean notify_plugin = FALSE; + /* Clean up old bitrate values, and get the current bitrates */ + janus_bwe_stream_bitrate_update(bwe->sent, now, janus_bwe_packet_type_regular, 0, 0); + janus_bwe_stream_bitrate_update(bwe->sent, now, janus_bwe_packet_type_rtx, 0, 0); + janus_bwe_stream_bitrate_update(bwe->sent, now, janus_bwe_packet_type_probing, 0, 0); + janus_bwe_stream_bitrate_update(bwe->acked, now, janus_bwe_packet_type_regular, 0, 0); + janus_bwe_stream_bitrate_update(bwe->acked, now, janus_bwe_packet_type_rtx, 0, 0); + janus_bwe_stream_bitrate_update(bwe->acked, now, janus_bwe_packet_type_probing, 0, 0); + uint32_t rtx_out = bwe->sent->packets[janus_bwe_packet_type_rtx*3] ? bwe->sent->bitrate[janus_bwe_packet_type_rtx*3] : 0; + uint32_t probing_out = bwe->sent->packets[janus_bwe_packet_type_probing*3] ? bwe->sent->bitrate[janus_bwe_packet_type_probing*3] : 0; + uint32_t bitrate_out = rtx_out + probing_out + (bwe->sent->packets[0] ? bwe->sent->bitrate[0] : 0); + uint32_t rtx_in = bwe->acked->packets[janus_bwe_packet_type_rtx*3] ? bwe->acked->bitrate[janus_bwe_packet_type_rtx*3] : 0; + uint32_t probing_in = bwe->acked->packets[janus_bwe_packet_type_probing*3] ? bwe->acked->bitrate[janus_bwe_packet_type_probing*3] : 0; + uint32_t bitrate_in = rtx_in + probing_in + (bwe->acked->packets[0] ? bwe->acked->bitrate[0] : 0); + /* Get the average delay */ + double avg_delay_latest = ((double)bwe->delay / (double)bwe->received_pkts) / 1000; + janus_bwe_delay_tracker_update(bwe->delays, now, avg_delay_latest); + int dts = bwe->delays->queue ? g_queue_get_length(bwe->delays->queue) : 0; + if(dts == 0) + dts = 1; + double avg_delay = bwe->delays->sum / (double)dts; + double avg_delay_weighted = (bwe->bitrate_ts == now ? avg_delay : ((bwe->avg_delay * 0.9) + (avg_delay * 0.1))); + /* FIXME Estimate the bandwidth */ + uint32_t estimate = bitrate_in; + uint16_t tot = bwe->received_pkts + bwe->lost_pkts; + if(tot > 0) + bwe->loss_ratio = (double)bwe->lost_pkts / (double)tot; + /* Check if there's packet loss or congestion */ + if(bwe->loss_ratio > 0.05) { + /* FIXME Lossy network? Set the estimate to the acknowledged bitrate */ + if(bwe->status != janus_bwe_status_lossy && bwe->status != janus_bwe_status_congested) + notify_plugin = TRUE; + bwe->status = janus_bwe_status_lossy; + bwe->status_changed = now; + bwe->estimate = estimate; + } else if(avg_delay_weighted >= 1.0 && (avg_delay_weighted - bwe->avg_delay) >= 0.05) { + JANUS_LOG(LOG_WARN, "[BWE][%"SCNi64"] Congested (delay=%.2f, increase=%.2f)\n", + now, avg_delay_weighted, avg_delay_weighted - bwe->avg_delay); + /* FIXME Delay is increasing */ + if(bwe->status != janus_bwe_status_lossy && bwe->status != janus_bwe_status_congested) + notify_plugin = TRUE; + bwe->status = janus_bwe_status_congested; + bwe->status_changed = now; + bwe->estimate = estimate; + //~ else + //~ bwe->estimate = ((double)bwe->estimate * 0.8) + ((double)estimate * 0.2); + //~ } else if(bitrate_out > bitrate_in && (bitrate_out - bitrate_in > 50000)) { + //~ /* FIXME We sent much more than what was acked, another indicator of possible congestion? */ + //~ JANUS_LOG(LOG_WARN, "[BWE][%"SCNi64"] Congested (too much diff with acked rate, %"SCNu32")\n", + //~ now, bitrate_out - bitrate_in); + //~ if(bwe->status != janus_bwe_status_lossy && bwe->status != janus_bwe_status_congested) + //~ notify_plugin = TRUE; + //~ bwe->status = janus_bwe_status_congested; + //~ bwe->status_changed = now; + //~ bwe->estimate = estimate; + } else { + /* FIXME All is fine? Check what state we're in */ + if(bwe->status == janus_bwe_status_lossy || bwe->status == janus_bwe_status_congested) { + bwe->status = janus_bwe_status_recovering; + bwe->status_changed = now; + } + if(bwe->status == janus_bwe_status_recovering) { + /* FIXME Still recovering */ + if(now - bwe->status_changed >= 5*G_USEC_PER_SEC) { + /* FIXME Recovery ended, let's assume everything is fine now */ + bwe->status = janus_bwe_status_regular; + bwe->status_changed = now; + /* Restore probing but incrementally */ + bwe->probing_sent = 0; + bwe->probing_portion = 0.0; + bwe->probing_buildup = 0; + bwe->probing_buildup_step = 1000; + bwe->probing_buildup_timer = now; + } else { + /* FIXME Keep converging to the estimate */ + if(estimate > bwe->estimate) + bwe->estimate = estimate; + //~ else + //~ bwe->estimate = ((double)bwe->estimate * 0.8) + ((double)estimate * 0.2); + } + } + if(bwe->status == janus_bwe_status_regular) { + /* FIXME Slowly increase */ + if(estimate > bwe->estimate) + bwe->estimate = estimate; + //~ else if(now - bwe->status_changed < 10*G_USEC_PER_SEC) + //~ bwe->estimate = ((double)bwe->estimate * 1.02); + } + } + bwe->avg_delay = avg_delay_weighted; + bwe->bitrate_ts = now; + JANUS_LOG(LOG_DBG, "[BWE][%"SCNi64"][%s] sent=%"SCNu32"kbps (probing=%"SCNu32"kbps), acked=%"SCNu32"kbps (probing=%"SCNu32"kbps), loss=%.2f%%, avg_delay=%.2fms, estimate=%"SCNu32"\n", + now, janus_bwe_status_description(bwe->status), + bitrate_out / 1024, probing_out / 1024, bitrate_in / 1024, probing_in / 1024, + bwe->loss_ratio, bwe->avg_delay, bwe->estimate); + + /* Save the details to CSV and/or send them externally via UDP, if enabled */ + if(bwe->csv != NULL || bwe->fd > -1) { + char line[2048]; + g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n", + now - bwe->started, bwe->status, + bwe->estimate, bwe->probing_target, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in, + bwe->received_pkts, bwe->lost_pkts, bwe->loss_ratio, avg_delay, avg_delay_weighted, avg_delay_latest); + if(bwe->csv != NULL) + fwrite(line, sizeof(char), strlen(line), bwe->csv); + if(bwe->fd > -1) + send(bwe->fd, line, strlen(line), 0); + } + + /* Reset values */ + bwe->delay = 0; + bwe->received_pkts = 0; + bwe->lost_pkts = 0; + /* Check if we should notify the plugin about the estimate */ + if(bwe->status != janus_bwe_status_start && (notify_plugin || (now - bwe->last_notified) >= 250000)) { + bwe->notify_plugin = TRUE; + bwe->last_notified = now; + } +} + +gboolean janus_bwe_save_csv(janus_bwe_context *bwe, const char *path) { + if(bwe == NULL || path == NULL || bwe->csv != NULL) + return FALSE; + /* Open the CSV file */ + bwe->csv = fopen(path, "wt"); + if(bwe->csv == NULL) { + JANUS_LOG(LOG_ERR, "Couldn't open CSV file for BWE stats: %s\n", g_strerror(errno)); + return FALSE; + } + /* Write a header line with the names of the fields we'll save */ + char line[2048]; + g_snprintf(line, sizeof(line), "time,status,estimate,probing_target,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n"); + fwrite(line, sizeof(char), strlen(line), bwe->csv); + fflush(bwe->csv); + /* Done */ + return TRUE; +} + +void janus_bwe_close_csv(janus_bwe_context *bwe) { + if(bwe == NULL || bwe->csv == NULL) + return; + fclose(bwe->csv); + bwe->csv = NULL; +} + +gboolean janus_bwe_save_live(janus_bwe_context *bwe, const char *host, uint16_t port) { + if(bwe == NULL || host == NULL || port == 0 || bwe->fd > -1) + return FALSE; + /* Check if we need to resolve this host address */ + struct addrinfo *res = NULL, *start = NULL; + janus_network_address addr; + janus_network_address_string_buffer addr_buf; + const char *resolved_host = NULL; + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + if(getaddrinfo(host, NULL, NULL, &res) == 0) { + start = res; + while(res != NULL) { + if(janus_network_address_from_sockaddr(res->ai_addr, &addr) == 0 && + janus_network_address_to_string_buffer(&addr, &addr_buf) == 0) { + /* Resolved */ + resolved_host = janus_network_address_string_from_buffer(&addr_buf); + freeaddrinfo(start); + start = NULL; + break; + } + res = res->ai_next; + } + } + if(resolved_host == NULL) { + if(start) + freeaddrinfo(start); + JANUS_LOG(LOG_ERR, "Could not resolve address (%s) for BWE stats...\n", host); + return FALSE; + } + host = resolved_host; + /* Create the socket */ + bwe->fd = socket(addr.family, SOCK_DGRAM, IPPROTO_UDP); + if(bwe->fd == -1) { + JANUS_LOG(LOG_ERR, "Error creating socket for BWE stats: %s\n", g_strerror(errno)); + return FALSE; + } + struct sockaddr_in serv_addr = { 0 }; + struct sockaddr_in6 serv_addr6 = { 0 }; + if(addr.family == AF_INET6) { + serv_addr6.sin6_family = AF_INET6; + inet_pton(AF_INET6, host, &serv_addr6.sin6_addr); + serv_addr6.sin6_port = htons(port); + } else { + serv_addr.sin_family = AF_INET; + inet_pton(AF_INET, host, &serv_addr.sin_addr); + serv_addr.sin_port = htons(port); + } + /* Connect the socket to the provided address */ + struct sockaddr *address = (addr.family == AF_INET ? (struct sockaddr *)&serv_addr : (struct sockaddr *)&serv_addr6); + size_t addrlen = (addr.family == AF_INET ? sizeof(serv_addr) : sizeof(serv_addr6)); + if(connect(bwe->fd, (struct sockaddr *)address, addrlen) < 0) { + JANUS_LOG(LOG_ERR, "Error connecting socket for BWE stats: %s\n", g_strerror(errno)); + close(bwe->fd); + bwe->fd = -1; + } + /* Done */ + return TRUE; +} + +void janus_bwe_close_live(janus_bwe_context *bwe) { + if(bwe == NULL || bwe->fd == -1) + return; + close(bwe->fd); + bwe->fd = -1; +} + +janus_bwe_stream_bitrate *janus_bwe_stream_bitrate_create(void) { + janus_bwe_stream_bitrate *bwe_sb = g_malloc0(sizeof(janus_bwe_stream_bitrate)); + janus_mutex_init(&bwe_sb->mutex); + return bwe_sb; +} + +void janus_bwe_stream_bitrate_update(janus_bwe_stream_bitrate *bwe_sb, int64_t when, int sl, int tl, int size) { + if(bwe_sb == NULL || sl < 0 || sl > 2 || tl > 2) + return; + if(tl < 0) + tl = 0; + int i = 0; + int64_t cleanup_ts = when - G_USEC_PER_SEC; + janus_mutex_lock(&bwe_sb->mutex); + for(i=tl; i<3; i++) { + if(i <= tl && bwe_sb->packets[sl*3 + i] == NULL) + bwe_sb->packets[sl*3 + i] = g_queue_new(); + if(bwe_sb->packets[sl*3 + i] == NULL) + continue; + /* Check if we need to get rid of some old packets */ + janus_bwe_stream_packet *sp = g_queue_peek_head(bwe_sb->packets[sl*3 + i]); + while(sp && sp->sent_ts < cleanup_ts) { + sp = g_queue_pop_head(bwe_sb->packets[sl*3 + i]); + if(bwe_sb->bitrate[sl*3 + i] >= sp->size) + bwe_sb->bitrate[sl*3 + i] -= sp->size; + g_free(sp); + sp = g_queue_peek_head(bwe_sb->packets[sl*3 + i]); + } + /* Check if there's anything new we need to add now */ + if(size > 0) { + sp = g_malloc(sizeof(janus_bwe_stream_packet)); + sp->sent_ts = when; + sp->size = size*8; + bwe_sb->bitrate[sl*3 + i] += sp->size; + g_queue_push_tail(bwe_sb->packets[sl*3 + i], sp); + } + } + janus_mutex_unlock(&bwe_sb->mutex); +} + +void janus_bwe_stream_bitrate_destroy(janus_bwe_stream_bitrate *bwe_sb) { + if(bwe_sb == NULL) + return; + janus_mutex_lock(&bwe_sb->mutex); + for(int i=0; i<9; i++) { + if(bwe_sb->packets[i] != NULL) { + g_queue_free_full(bwe_sb->packets[i], (GDestroyNotify)g_free); + bwe_sb->packets[i] = NULL; + } + } + janus_mutex_unlock(&bwe_sb->mutex); + janus_mutex_destroy(&bwe_sb->mutex); + g_free(bwe_sb); +} + +janus_bwe_delay_tracker *janus_bwe_delay_tracker_create(int64_t keep_ts) { + janus_bwe_delay_tracker *dt = g_malloc0(sizeof(janus_bwe_delay_tracker)); + dt->keep_ts = (keep_ts > 0 ? keep_ts : G_USEC_PER_SEC); + return dt; +} + +void janus_bwe_delay_tracker_update(janus_bwe_delay_tracker *dt, int64_t when, double avg_delay) { + if(dt == NULL) + return; + if(dt->queue == NULL) + dt->queue = g_queue_new(); + /* Check if we need to get rid of some old feedback */ + int64_t cleanup_ts = when - dt->keep_ts; + janus_bwe_delay_fb *fb = g_queue_peek_head(dt->queue); + while(fb && fb->sent_ts < cleanup_ts) { + fb = g_queue_pop_head(dt->queue); + dt->sum -= fb->avg_delay; + g_free(fb); + fb = g_queue_peek_head(dt->queue); + } + /* Check if there's anything new we need to add now */ + fb = g_malloc(sizeof(janus_bwe_delay_fb)); + fb->sent_ts = when; + fb->avg_delay = avg_delay; + dt->sum += avg_delay; + g_queue_push_tail(dt->queue, fb); +} + +void janus_bwe_delay_tracker_destroy(janus_bwe_delay_tracker *dt) { + if(dt && dt->queue) + g_queue_free_full(dt->queue, (GDestroyNotify)g_free); + g_free(dt); +} diff --git a/src/bwe.h b/src/bwe.h new file mode 100644 index 0000000000..e77fa7f74e --- /dev/null +++ b/src/bwe.h @@ -0,0 +1,233 @@ +/*! \file bwe.h + * \author Lorenzo Miniero + * \copyright GNU General Public License v3 + * \brief Bandwidth estimation tools (headers) + * \details Implementation of a basic bandwidth estimator for outgoing + * RTP flows, based on Transport Wide CC and a few other utilities. + * + * \ingroup protocols + * \ref protocols + */ + +#ifndef JANUS_BWE_H +#define JANUS_BWE_H + +#include + +#include "mutex.h" + +/*! \brief Tracker for a stream bitrate (whether it's simulcast/SVC or not) */ +typedef struct janus_bwe_stream_bitrate { + /*! \brief Time based queue of packet sizes */ + GQueue *packets[9]; + /*! \brief Current bitrate */ + uint32_t bitrate[9]; + /*! \brief Mutex to lock this instance */ + janus_mutex mutex; +} janus_bwe_stream_bitrate; +/*! \brief Helper method to create a new janus_bwe_stream_bitrate instance + * @returns A janus_bwe_stream_bitrate instance, if successful, or NULL otherwise */ +janus_bwe_stream_bitrate *janus_bwe_stream_bitrate_create(void); +/*! \brief Helper method to update an existing janus_bwe_stream_bitrate instance with new data + * \note Passing \c -1 or \c 0 as size just updates the queue to get rid of older values + * @param[in] bwe_sb The janus_bwe_stream_bitrate instance to update + * @param[in] when Timestamp of the packet + * @param[in] sl Substream or spatial layer of the packet (can be 0 for audio) + * @param[in] sl Temporal layer of the packet (can be 0 for audio) + * @param[in] size Size of the packet */ +void janus_bwe_stream_bitrate_update(janus_bwe_stream_bitrate *bwe_sb, int64_t when, int sl, int tl, int size); +/*! \brief Helper method to destroy an existing janus_bwe_stream_bitrate instance + * @param[in] bwe_sb The janus_bwe_stream_bitrate instance to destroy */ +void janus_bwe_stream_bitrate_destroy(janus_bwe_stream_bitrate *bwe_sb); + +/*! \brief Packet size and time */ +typedef struct janus_bwe_stream_packet { + /*! \brief Timestamp */ + int64_t sent_ts; + /*! \brief Size of packet */ + uint16_t size; +} janus_bwe_stream_packet; + +/*! \brief Tracker for a stream bitrate (whether it's simulcast/SVC or not) */ +typedef struct janus_bwe_delay_tracker { + /*! \brief Time based queue of delays */ + GQueue *queue; + /*! \brief Current sum of average delays */ + double sum; + /*! \brief How long to keep items in queue (1s by default) */ + int64_t keep_ts; +} janus_bwe_delay_tracker; +/*! \brief Helper method to create a new janus_bwe_delay_tracker instance + * @note Passing 0 or a negative value for keep_ts will assume 1 second (G_USEC_PER_SEC) + * @param[im] keep_ts How long to keep items in queue + * @returns A janus_bwe_delay_tracker instance, if successful, or NULL otherwise */ +janus_bwe_delay_tracker *janus_bwe_delay_tracker_create(int64_t keep_ts); +/*! \brief Helper method to update an existing janus_bwe_delay_tracker instance with new data + * @param[in] dt The janus_bwe_delay_tracker instance to update + * @param[in] when Timestamp of the average delay + * @param[in] avg_delay Average delay */ +void janus_bwe_delay_tracker_update(janus_bwe_delay_tracker *dt, int64_t when, double avg_delay); +/*! \brief Helper method to destroy an existing janus_bwe_delay_tracker instance + * @param[in] dt The janus_bwe_delay_tracker instance to destroy */ +void janus_bwe_delay_tracker_destroy(janus_bwe_delay_tracker *dt); + +/*! \brief Instance of accumulated delay, from TWCC feedback */ +typedef struct janus_bwe_delay_fb { + /*! \brief Timestamp */ + int64_t sent_ts; + /*! \brief Average delay */ + double avg_delay; +} janus_bwe_delay_fb; + +/*! \brief Transport Wide CC statuses */ +typedef enum janus_bwe_twcc_status { + janus_bwe_twcc_status_notreceived = 0, + janus_bwe_twcc_status_smalldelta = 1, + janus_bwe_twcc_status_largeornegativedelta = 2, + janus_bwe_twcc_status_reserved = 3 +} janus_bwe_twcc_status; +/*! \brief Helper to return a string description of a TWCC status + * @param[in] status The janus_bwe_twcc_status status + * @returns A string description */ +const char *janus_bwe_twcc_status_description(janus_bwe_twcc_status status); + +/*! \brief Type of in-flight packet */ +typedef enum janus_bwe_packet_type { + /*! \brief Regular RTP packet */ + janus_bwe_packet_type_regular = 0, + /*! \brief RTC packet */ + janus_bwe_packet_type_rtx, + /*! \brief Probing */ + janus_bwe_packet_type_probing +} janus_bwe_packet_type; + +/*! \brief Tracking info for in-flight packet we're waiting TWCC feedback for */ +typedef struct janus_bwe_twcc_inflight { + /*! \brief The TWCC sequence number */ + uint16_t seq; + /*! \brief Monotonic time this packet was delivered */ + int64_t sent_ts; + /*! \brief Delta (in us) since the delivery of the previous packet */ + int64_t delta_us; + /*! \brief Type of packet (e.g., regular or rtx) */ + janus_bwe_packet_type type; + /*! \brief Size of the sent packet */ + int size; +} janus_bwe_twcc_inflight; + +/*! \brief Current status of the bandwidth estimator */ +typedef enum janus_bwe_status { + /* BWE just started */ + janus_bwe_status_start = 0, + /* BWE in the regular/increasing stage */ + janus_bwe_status_regular, + /* BWE detected too many losses */ + janus_bwe_status_lossy, + /* BWE detected congestion */ + janus_bwe_status_congested, + /* BWE recovering from losses/congestion */ + janus_bwe_status_recovering +} janus_bwe_status; +/*! \brief Helper to return a string description of a BWE status + * @param[in] status The janus_bwe_status status + * @returns A string description */ +const char *janus_bwe_status_description(janus_bwe_status status); + +/*! \brief Bandwidth estimation context */ +typedef struct janus_bwe_context { + /*! \brief Current status of the context */ + janus_bwe_status status; + /*! \brief Monotonic timestamp of when the BWE work started */ + int64_t started; + /*! \brief Monotonic timestamp of when the BWE status last changed */ + int64_t status_changed; + /*! \brief Index of the m-line we're using for probing */ + int probing_mindex; + /*! \brief How much we should aim for with out probing (and how much to increase, plus much we sent in a second) */ + uint32_t probing_target, probing_buildup, probing_buildup_step, probing_sent; + /*! \brief How many times we went through probing in a second */ + uint8_t probing_count; + /*! \brief Portion of probing we didn't manage to send the previous round */ + double probing_portion; + /*! \brief In case probing was deferred, when it shoult restart */ + int64_t probing_deferred; + /*! \brief Timer for building up probing */ + int64_t probing_buildup_timer; + /*! \brief Monotonic timestamp of the last sent packet */ + int64_t last_sent_ts; + /*! \brief Last twcc seq number of a received packet */ + uint16_t last_recv_seq; + /*! \brief Map of in-flight packets */ + GHashTable *packets; + /*! \brief Monotonic timestamp of when we last computed the bitrates */ + int64_t bitrate_ts; + /*! \brief Bitrate tracker for sent and acked packets */ + janus_bwe_stream_bitrate *sent, *acked; + /*! \brief How much delay has been accumulated in the last feedback (may be negative) */ + int64_t delay; + /*! \brief Accumulated delay over time */ + janus_bwe_delay_tracker *delays; + /*! \brief Number of packets with a received status, and number of lost ones */ + uint16_t received_pkts, lost_pkts; + /*! \brief Latest average delay */ + double avg_delay; + /*! \brief Latest loss ratio */ + double loss_ratio; + /*! \brief Latest estimated bitrate */ + uint32_t estimate; + /*! \brief Whether we can notify the plugin about the estimate */ + gboolean notify_plugin; + /*! \brief When we last notified the plugin */ + int64_t last_notified; + /*! \brief CSV where we save the debugging information */ + FILE *csv; + /*! \brief UDP socket where to send the debugging information */ + int fd; +} janus_bwe_context; +/*! \brief Helper to create a new bandwidth estimation context + * @returns a new janus_bwe_context instance, if successful, or NULL otherwise */ +janus_bwe_context *janus_bwe_context_create(void); +/*! \brief Helper to destroy an existing bandwidth estimation context + * @param[in] bew The janus_bwe_context instance to destroy */ +void janus_bwe_context_destroy(janus_bwe_context *bwe); + +/*! \brief Helper method to quickly add a new inflight packet to a BWE instance + * @param[in] bwe The janus_bwe_context instance to update + * @param[in] seq The TWCC sequence number of the new inflight packet + * @param[in] sent The timestamp of the packet delivery + * @param[in] type The type of this packet + * @param[in] size The size of this packet + * @returns TRUE, if successful, or FALSE otherwise */ +gboolean janus_bwe_context_add_inflight(janus_bwe_context *bwe, + uint16_t seq, int64_t sent, janus_bwe_packet_type type, int size); +/*! \brief Handle feedback on an inflight packet + * @param[in] bwe The janus_bwe_context instance to update + * @param[in] seq The TWCC sequence number of the inflight packet we have feedback for + * @param[in] status Feedback status for the packet + * @param[in] delta_us If the packet was received, the delta that was provided + * @param[in] first True if this is the first received packet in a TWCC feedback */ +void janus_bwe_context_handle_feedback(janus_bwe_context *bwe, + uint16_t seq, janus_bwe_twcc_status status, int64_t delta_us, gboolean first); +/*! \brief Update the internal BWE context state with a new tick + * @param[in] bwe The janus_bwe_context instance to update */ +void janus_bwe_context_update(janus_bwe_context *bwe); + +/*! \brief Helper method to start saving the stats related to the BWE processing to a CSV file + * @param[in] bwe The janus_bwe_context instance to save + * @param[in] path Path where to save the file to + * @returns TRUE, if successful, or FALSE otherwise */ +gboolean janus_bwe_save_csv(janus_bwe_context *bwe, const char *path); +/*! \brief Helper method to stop saving the stats related to the BWE processing to a CSV file + * @param[in] bwe The janus_bwe_context instance to update */ +void janus_bwe_close_csv(janus_bwe_context *bwe); +/*! \brief Helper method to relay stats related to the BWE processing to an external UDP address + * @param[in] bwe The janus_bwe_context instance to save + * @param[in] host The address to send stats to + * @param[in] port The port to send stats to + * @returns TRUE, if successful, or FALSE otherwise */ +gboolean janus_bwe_save_live(janus_bwe_context *bwe, const char *host, uint16_t port); +/*! \brief Helper method to stop relaying the stats related to the BWE processing + * @param[in] bwe The janus_bwe_context instance to update */ +void janus_bwe_close_live(janus_bwe_context *bwe); + +#endif diff --git a/src/ice.c b/src/ice.c index 4f258993a1..7276595024 100644 --- a/src/ice.c +++ b/src/ice.c @@ -473,7 +473,9 @@ typedef struct janus_ice_queued_packet { gint type; gboolean control; gboolean retransmission; + gboolean probing; gboolean encrypted; + guint16 twcc_seq; gint64 added; } janus_ice_queued_packet; /* A few static, fake, messages we use as a trigger: e.g., to start a @@ -483,6 +485,10 @@ static janus_ice_queued_packet janus_ice_add_candidates, janus_ice_dtls_handshake, janus_ice_media_stopped, + janus_ice_enable_bwe, + janus_ice_set_bwe_target, + janus_ice_debug_bwe, + janus_ice_disable_bwe, janus_ice_hangup_peerconnection, janus_ice_detach_handle, janus_ice_data_ready; @@ -538,6 +544,7 @@ typedef struct janus_ice_outgoing_traffic { } janus_ice_outgoing_traffic; static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data); static gboolean janus_ice_outgoing_stats_handle(gpointer user_data); +static gboolean janus_ice_outgoing_bwe_handle(gpointer user_data); static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janus_ice_queued_packet *pkt); static gboolean janus_ice_outgoing_traffic_prepare(GSource *source, gint *timeout) { janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source; @@ -660,6 +667,8 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) { if(pkt == NULL || pkt == &janus_ice_start_gathering || pkt == &janus_ice_add_candidates || pkt == &janus_ice_dtls_handshake || + pkt == &janus_ice_enable_bwe || pkt == &janus_ice_set_bwe_target || + pkt == &janus_ice_debug_bwe || pkt == &janus_ice_disable_bwe || pkt == &janus_ice_media_stopped || pkt == &janus_ice_hangup_peerconnection || pkt == &janus_ice_detach_handle || @@ -1623,6 +1632,8 @@ static void janus_ice_handle_free(const janus_refcount *handle_ref) { } g_free(handle->opaque_id); g_free(handle->token); + g_free(handle->bwe_csv); + g_free(handle->bwe_host); g_free(handle); } @@ -1857,6 +1868,7 @@ static void janus_ice_peerconnection_free(const janus_refcount *pc_ref) { if(pc->rtx_payload_types_rev != NULL) g_hash_table_destroy(pc->rtx_payload_types_rev); pc->rtx_payload_types_rev = NULL; + janus_bwe_context_destroy(pc->bwe); g_free(pc); pc = NULL; } @@ -1888,6 +1900,7 @@ janus_ice_peerconnection_medium *janus_ice_peerconnection_medium_create(janus_ic medium->rtcp_ctx[0]->in_media_link_quality = 100; medium->rtcp_ctx[0]->out_link_quality = 100; medium->rtcp_ctx[0]->out_media_link_quality = 100; + medium->nack_queue_ms = min_nack_queue; /* We can address media by SSRC */ g_hash_table_insert(pc->media_byssrc, GINT_TO_POINTER(medium->ssrc), medium); janus_refcount_increase(&medium->ref); @@ -3058,7 +3071,7 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp int res = janus_rtcp_nacks(nackbuf, sizeof(nackbuf), nacks); if(res > 0) { /* Set the right local and remote SSRC in the RTCP packet */ - janus_rtcp_fix_ssrc(NULL, nackbuf, res, 1, + janus_rtcp_fix_ssrc(NULL, NULL, nackbuf, res, 1, medium->ssrc, medium->ssrc_peer[vindex]); janus_plugin_rtcp rtcp = { .mindex = medium->mindex, .video = video, .buffer = nackbuf, .length = res }; janus_ice_relay_rtcp_internal(handle, medium, &rtcp, FALSE); @@ -3137,7 +3150,7 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp /* Let's process this RTCP (compound?) packet, and update the RTCP context for this stream in case */ rtcp_context *rtcp_ctx = medium->rtcp_ctx[vindex]; uint32_t rtt = rtcp_ctx ? rtcp_ctx->rtt : 0; - if(janus_rtcp_parse(rtcp_ctx, buf, buflen) < 0) { + if(janus_rtcp_parse(rtcp_ctx, pc->bwe, buf, buflen) < 0) { /* Drop the packet if the parsing function returns with an error */ return; } @@ -3203,8 +3216,10 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp pkt->extensions = p->extensions; pkt->control = FALSE; pkt->retransmission = TRUE; + pkt->probing = TRUE; pkt->label = NULL; pkt->protocol = NULL; + pkt->twcc_seq = 0; pkt->added = janus_get_monotonic_time(); /* What to send and how depends on whether we're doing RFC4588 or not */ if(!video || !janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX)) { @@ -3894,6 +3909,58 @@ void janus_ice_resend_trickles(janus_ice_handle *handle) { janus_ice_notify_trickle(handle, NULL); } +void janus_ice_handle_enable_bwe(janus_ice_handle *handle) { + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Enabling bandwidth estimation\n", handle->handle_id); + if(handle->queued_packets != NULL) { +#if GLIB_CHECK_VERSION(2, 46, 0) + g_async_queue_push_front(handle->queued_packets, &janus_ice_enable_bwe); +#else + g_async_queue_push(handle->queued_packets, &janus_ice_enable_bwe); +#endif + g_main_context_wakeup(handle->mainctx); + } +} + +/* FIXME */ +void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate) { + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Configuring bandwidth estimation target: %"SCNu32"\n", handle->handle_id, bitrate); + janus_mutex_lock(&handle->mutex); + handle->bwe_target = bitrate; + janus_mutex_unlock(&handle->mutex); + if(handle->queued_packets != NULL) { +#if GLIB_CHECK_VERSION(2, 46, 0) + g_async_queue_push_front(handle->queued_packets, &janus_ice_set_bwe_target); +#else + g_async_queue_push(handle->queued_packets, &janus_ice_set_bwe_target); +#endif + g_main_context_wakeup(handle->mainctx); + } +} + +void janus_ice_handle_debug_bwe(janus_ice_handle *handle) { + JANUS_LOG(LOG_WARN, "[%"SCNu64"] Tweaking debugging of bandwidth estimation\n", handle->handle_id); + if(handle->queued_packets != NULL) { +#if GLIB_CHECK_VERSION(2, 46, 0) + g_async_queue_push_front(handle->queued_packets, &janus_ice_debug_bwe); +#else + g_async_queue_push(handle->queued_packets, &janus_ice_debug_bwe); +#endif + g_main_context_wakeup(handle->mainctx); + } +} + +void janus_ice_handle_disable_bwe(janus_ice_handle *handle) { + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id); + if(handle->queued_packets != NULL) { +#if GLIB_CHECK_VERSION(2, 46, 0) + g_async_queue_push_front(handle->queued_packets, &janus_ice_disable_bwe); +#else + g_async_queue_push(handle->queued_packets, &janus_ice_disable_bwe); +#endif + g_main_context_wakeup(handle->mainctx); + } +} + static void janus_ice_rtp_extension_update(janus_ice_handle *handle, janus_ice_peerconnection_medium *medium, janus_ice_queued_packet *packet) { if(handle == NULL || handle->pc == NULL || medium == NULL || packet == NULL || packet->data == NULL) return; @@ -3912,7 +3979,7 @@ static void janus_ice_rtp_extension_update(janus_ice_handle *handle, janus_ice_p /* Add core and plugin extensions, if any */ gboolean video = (packet->type == JANUS_ICE_PACKET_VIDEO); if(handle->pc->mid_ext_id > 0 || (video && handle->pc->abs_send_time_ext_id > 0) || - (video && handle->pc->transport_wide_cc_ext_id > 0) || + (medium->do_twcc && handle->pc->transport_wide_cc_ext_id > 0) || (!video && packet->extensions.audio_level > -1 && handle->pc->audiolevel_ext_id > 0) || (video && packet->extensions.video_rotation > -1 && handle->pc->videoorientation_ext_id > 0) || (video && packet->extensions.min_delay > -1 && packet->extensions.max_delay > -1 && handle->pc->playoutdelay_ext_id > 0) || @@ -3950,9 +4017,10 @@ static void janus_ice_rtp_extension_update(janus_ice_handle *handle, janus_ice_p } } /* Check if we need to add the transport-wide CC extension */ - if(video && handle->pc->transport_wide_cc_ext_id > 0) { + if(medium->do_twcc && handle->pc->transport_wide_cc_ext_id > 0) { handle->pc->transport_wide_cc_out_seq_num++; - uint16_t transSeqNum = htons(handle->pc->transport_wide_cc_out_seq_num); + packet->twcc_seq = handle->pc->transport_wide_cc_out_seq_num; + uint16_t transSeqNum = htons(packet->twcc_seq); if(!use_2byte) { *index = (handle->pc->transport_wide_cc_ext_id << 4) + 1; memcpy(index+1, &transSeqNum, 2); @@ -4144,7 +4212,7 @@ static void janus_ice_rtp_extension_update(janus_ice_handle *handle, janus_ice_p } static gint rtcp_transport_wide_cc_stats_comparator(gconstpointer item1, gconstpointer item2) { - return ((rtcp_transport_wide_cc_stats*)item1)->transport_seq_num - ((rtcp_transport_wide_cc_stats*)item2)->transport_seq_num; + return ((janus_rtcp_transport_wide_cc_stats *)item1)->transport_seq_num - ((janus_rtcp_transport_wide_cc_stats *)item2)->transport_seq_num; } static gboolean janus_ice_outgoing_transport_wide_cc_feedback(gpointer user_data) { janus_ice_handle *handle = (janus_ice_handle *)user_data; @@ -4502,6 +4570,153 @@ static gboolean janus_ice_outgoing_stats_handle(gpointer user_data) { return G_SOURCE_CONTINUE; } +static gboolean janus_ice_outgoing_bwe_handle(gpointer user_data) { + janus_ice_handle *handle = (janus_ice_handle *)user_data; + janus_ice_peerconnection *pc = handle->pc; + if(pc == NULL || pc->bwe == NULL) + return G_SOURCE_CONTINUE; + /* First of all, let's check if we have to notify the plugin */ + if(pc->bwe->notify_plugin) { + /* Notify the plugin about the current estimate */ + pc->bwe->notify_plugin = FALSE; + janus_plugin *plugin = (janus_plugin *)handle->app; + if(plugin && plugin->estimated_bandwidth && janus_plugin_session_is_alive(handle->app_handle) && + !g_atomic_int_get(&handle->destroyed)) + plugin->estimated_bandwidth(handle->app_handle, pc->bwe->estimate); + } + /* Now, let's check if there's probing we need to perform */ + uint32_t bitrate_out = (pc->bwe->sent->packets[janus_bwe_packet_type_regular*3] ? pc->bwe->sent->bitrate[janus_bwe_packet_type_regular*3] : 0) + + (pc->bwe->sent->packets[janus_bwe_packet_type_rtx*3] ? pc->bwe->sent->bitrate[janus_bwe_packet_type_rtx*3] : 0); + if(pc->bwe->probing_target == 0 || pc->bwe->probing_target <= bitrate_out) + return G_SOURCE_CONTINUE; + if(pc->bwe->status != janus_bwe_status_start && pc->bwe->status != janus_bwe_status_regular) { + /* The BWE status may be lossy, congested, or recovering: don't probe for now */ + return G_SOURCE_CONTINUE; + } + gint64 now = janus_get_monotonic_time(); + if(pc->bwe->probing_deferred > 0) { + if(now < pc->bwe->probing_deferred) { + /* The probing has been deferred for a few seconds */ + return G_SOURCE_CONTINUE; + } + pc->bwe->probing_deferred = 0; + pc->bwe->probing_buildup_timer = now; + } + /* Get the medium instance we'll use for probing */ + janus_ice_peerconnection_medium *medium = NULL; + if(pc->bwe->probing_mindex != -1) + medium = g_hash_table_lookup(pc->media, GINT_TO_POINTER(pc->bwe->probing_mindex)); + if(medium == NULL || !medium->send || medium->rtx_payload_type == -1 || g_queue_is_empty(medium->retransmit_buffer)) { + /* We don't have a video medium we can use for probing (anymore?) */ + pc->bwe->probing_mindex = -1; + return G_SOURCE_CONTINUE; + } + int attempts = 10; + uint16_t seqnr = medium->last_rtp_seqnum; + janus_rtp_packet *p = NULL; + while(attempts > 0) { + p = g_hash_table_lookup(medium->retransmit_seqs, GUINT_TO_POINTER(seqnr)); + if(p == NULL) { + attempts--; + seqnr--; + continue; + } + /* FIXME Should we check if the packet is large enough? */ + break; + } + if(p == NULL) { + /* We don't have a probing packet to send */ + return G_SOURCE_CONTINUE; + } + /* Check how many duplicates we have to send of this packet */ + if(pc->bwe->probing_count == 20) { + pc->bwe->probing_count = 0; + pc->bwe->probing_sent = 0; + pc->bwe->probing_portion = 0.0; + } + pc->bwe->probing_count++; + /* Check if we need to build up probing */ + uint32_t required = pc->bwe->probing_target - bitrate_out; + if(pc->bwe->probing_buildup_timer > 0) { + if(pc->bwe->probing_buildup == 0) + pc->bwe->probing_buildup = pc->bwe->probing_buildup_step; + uint32_t gap = (pc->bwe->probing_buildup_step >= 10000 ? 500000 : 200000); + if(now - pc->bwe->probing_buildup_timer >= gap) { + pc->bwe->probing_buildup_step += 1000; + if(pc->bwe->probing_buildup_step > 10000) + pc->bwe->probing_buildup_step = 10000; + pc->bwe->probing_buildup += pc->bwe->probing_buildup_step; + pc->bwe->probing_buildup_timer = now; + } + if(pc->bwe->probing_buildup >= required) + pc->bwe->probing_buildup = required; + required = pc->bwe->probing_buildup; + } + if(pc->bwe->probing_sent >= required) { + /* We sent enough for this round */ + return G_SOURCE_CONTINUE; + } + uint32_t required_now = required / (8 * 20); + double prev_portion = pc->bwe->probing_portion; + double portion = (double)required_now / (double)(p->length+SRTP_MAX_TAG_LEN); + double new_portion = prev_portion + portion; + int duplicates = (int)(new_portion); + if(new_portion - (double)duplicates > 0.5) + duplicates++; + if(duplicates == 0) { + /* Skip this round, we'll send something later */ + pc->bwe->probing_portion = new_portion; + return G_SOURCE_CONTINUE; + } + pc->bwe->probing_portion = ((double)duplicates < new_portion) ? + (new_portion - (double)duplicates) : 0; + /* FIXME We have a packet we can retransmit for probing purposes, enqueue it N times */ + JANUS_LOG(LOG_DBG, "[%"SCNu64"] #%d: Scheduling %u for retransmission (probing, pkt_size=%d)\n", + handle->handle_id, (pc->bwe->probing_count - 1), seqnr, p->length+SRTP_MAX_TAG_LEN); + int i = 0; + uint32_t enqueued = 0; + for(i=0; i < duplicates; i++) { + p->last_retransmit = now; + /* Enqueue it */ + janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet)); + pkt->mindex = medium->mindex; + pkt->data = g_malloc(p->length+SRTP_MAX_TAG_LEN); + memcpy(pkt->data, p->data, p->length); + pkt->length = p->length; + pkt->type = JANUS_ICE_PACKET_VIDEO; + pkt->extensions = p->extensions; + pkt->control = FALSE; + pkt->retransmission = TRUE; + pkt->probing = TRUE; + pkt->label = NULL; + pkt->protocol = NULL; + pkt->twcc_seq = 0; + pkt->added = janus_get_monotonic_time(); + pkt->encrypted = FALSE; + janus_rtp_header *header = (janus_rtp_header *)pkt->data; + header->type = medium->rtx_payload_type; + header->ssrc = htonl(medium->ssrc_rtx); + medium->rtx_seq_number++; + header->seq_number = htons(medium->rtx_seq_number); + if(handle->queued_packets != NULL) { +#if GLIB_CHECK_VERSION(2, 46, 0) + g_async_queue_push_front(handle->queued_packets, pkt); +#else + g_async_queue_push(handle->queued_packets, pkt); +#endif + g_main_context_wakeup(handle->mainctx); + } else { + janus_ice_free_queued_packet(pkt); + } + enqueued += p->length+SRTP_MAX_TAG_LEN; + } + pc->bwe->probing_sent += (enqueued * 8); + JANUS_LOG(LOG_DBG, "[%"SCNu64"] -- Enqueued %d duplicates (%"SCNu32" bytes, %"SCNu32" kbps; sent=%"SCNu32"/%"SCNu32")\n", + handle->handle_id, duplicates, enqueued, (enqueued/1000)*8, pc->bwe->probing_sent, required); + /* Done */ + return G_SOURCE_CONTINUE; +} + static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janus_ice_queued_packet *pkt) { janus_session *session = (janus_session *)handle->session; janus_ice_peerconnection *pc = handle->pc; @@ -4555,6 +4770,90 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu guint id = g_source_attach(pc->dtlsrt_source, handle->mainctx); JANUS_LOG(LOG_VERB, "[%"SCNu64"] Creating retransmission timer with ID %u\n", handle->handle_id, id); return G_SOURCE_CONTINUE; + } else if(pkt == &janus_ice_enable_bwe) { + /* Create a new bandwidth estimation context for this handle */ + if(pc == NULL || pc->bwe != NULL) + return G_SOURCE_CONTINUE; + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling bandwidth estimation\n", handle->handle_id); + pc->bwe = janus_bwe_context_create(); + janus_mutex_lock(&handle->mutex); + handle->bwe_target = 0; + /* Check if we need debugging */ + if(handle->bwe_csv) { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling CSV debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_save_csv(pc->bwe, handle->bwe_csv); + } + if(handle->bwe_host && handle->bwe_port > 0) { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling live debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_save_live(pc->bwe, handle->bwe_host, handle->bwe_port); + } + janus_mutex_unlock(&handle->mutex); + /* Let's create a source for BWE, for plugin notifications and probing */ + handle->bwe_source = g_timeout_source_new(50); /* FIXME */ + g_source_set_priority(handle->bwe_source, G_PRIORITY_DEFAULT); + g_source_set_callback(handle->bwe_source, janus_ice_outgoing_bwe_handle, handle, NULL); + g_source_attach(handle->bwe_source, handle->mainctx); + return G_SOURCE_CONTINUE; + } else if(pkt == &janus_ice_set_bwe_target) { + /* Enable probing for the BWE context (assuming BWE is active) */ + if(pc == NULL || pc->bwe == NULL) + return G_SOURCE_CONTINUE; + janus_mutex_lock(&handle->mutex); + uint32_t target = handle->bwe_target; + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Configuring bandwidth estimation target: %"SCNu32"\n", handle->handle_id, target); + if(target == pc->bwe->probing_target) { + /* Nothing to do */ + janus_mutex_unlock(&handle->mutex); + return G_SOURCE_CONTINUE; + } + janus_mutex_unlock(&handle->mutex); + /* Let's configure probing accordingly */ + pc->bwe->probing_target = target; + pc->bwe->probing_count = 0; + pc->bwe->probing_sent = 0; + pc->bwe->probing_portion = 0.0; + pc->bwe->probing_buildup = 0; + //~ pc->bwe->probing_buildup_timer = janus_get_monotonic_time(); + pc->bwe->probing_deferred = janus_get_monotonic_time() + G_USEC_PER_SEC; + return G_SOURCE_CONTINUE; + } else if(pkt == &janus_ice_debug_bwe) { + /* Enable or disable debugging for the bandwidth estimator */ + if(pc == NULL || pc->bwe == NULL) + return G_SOURCE_CONTINUE; + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Tweaking debugging for bandwidth estimation\n", handle->handle_id); + janus_mutex_lock(&handle->mutex); + if(handle->bwe_csv) { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling CSV debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_save_csv(pc->bwe, handle->bwe_csv); + } else { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling CSV debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_close_csv(pc->bwe); + } + if(handle->bwe_host && handle->bwe_port > 0) { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling live debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_save_live(pc->bwe, handle->bwe_host, handle->bwe_port); + } else { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling live debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_close_live(pc->bwe); + } + janus_mutex_unlock(&handle->mutex); + return G_SOURCE_CONTINUE; + } else if(pkt == &janus_ice_disable_bwe) { + /* We need to get rid of the bandwidth estimator */ + if(pc == NULL || pc->bwe == NULL) + return G_SOURCE_CONTINUE; + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id); + if(handle->bwe_source) { + g_source_destroy(handle->bwe_source); + g_source_unref(handle->bwe_source); + handle->bwe_source = NULL; + } + janus_bwe_context_destroy(pc->bwe); + pc->bwe = NULL; + janus_mutex_lock(&handle->mutex); + handle->bwe_target = 0; + janus_mutex_unlock(&handle->mutex); + return G_SOURCE_CONTINUE; } else if(pkt == &janus_ice_media_stopped) { /* Some media has been disabled on the way in, so use the callback to notify the peer */ if(pc == NULL) @@ -4599,6 +4898,11 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu g_source_unref(handle->twcc_source); handle->twcc_source = NULL; } + if(handle->bwe_source) { + g_source_destroy(handle->bwe_source); + g_source_unref(handle->bwe_source); + handle->bwe_source = NULL; + } if(handle->stats_source) { g_source_destroy(handle->stats_source); g_source_unref(handle->stats_source); @@ -4869,6 +5173,9 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu p->extensions = pkt->extensions; /* Copy the payload */ memcpy(p->data+hsize+2, payload, pkt->length - hsize); + /* Check if there's a BWE context and we need a medium for probing */ + if(pc->bwe && pc->bwe->probing_mindex == -1) + pc->bwe->probing_mindex = pkt->mindex; } /* Encrypt SRTP */ int protected = pkt->length; @@ -4891,6 +5198,15 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu if(sent < protected) { JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, protected); } + /* Keep track of the packet if we're using TWCC */ + if(medium->do_twcc && handle->pc->transport_wide_cc_ext_id > 0) { + janus_bwe_packet_type type = janus_bwe_packet_type_regular; + if(pkt->probing) + type = janus_bwe_packet_type_probing; + else if(pkt->retransmission) + type = janus_bwe_packet_type_rtx; + janus_bwe_context_add_inflight(pc->bwe, pkt->twcc_seq, janus_get_monotonic_time(), type, protected); + } /* Update stats */ if(sent > 0) { /* Update the RTCP context as well */ @@ -4914,6 +5230,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu if(medium->last_ntp_ts == 0 || (gint32)(timestamp - medium->last_rtp_ts) > 0) { medium->last_ntp_ts = (gint64)tv.tv_sec*G_USEC_PER_SEC + tv.tv_usec; medium->last_rtp_ts = timestamp; + medium->last_rtp_seqnum = ntohs(header->seq_number); } if(medium->first_ntp_ts[0] == 0) { medium->first_ntp_ts[0] = (gint64)tv.tv_sec*G_USEC_PER_SEC + tv.tv_usec; @@ -5037,8 +5354,10 @@ void janus_ice_relay_rtp(janus_ice_handle *handle, janus_plugin_rtp *packet) { pkt->control = FALSE; pkt->encrypted = FALSE; pkt->retransmission = FALSE; + pkt->probing = FALSE; pkt->label = NULL; pkt->protocol = NULL; + pkt->twcc_seq = 0; pkt->added = janus_get_monotonic_time(); janus_ice_queue_packet(handle, pkt); } @@ -5065,7 +5384,7 @@ void janus_ice_relay_rtcp_internal(janus_ice_handle *handle, janus_ice_peerconne * ones created by the core already have the right SSRCs in the right place */ JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Fixing SSRCs (local %u, peer %u)\n", handle->handle_id, medium->ssrc, medium->ssrc_peer[0]); - janus_rtcp_fix_ssrc(NULL, rtcp_buf, rtcp_len, 1, + janus_rtcp_fix_ssrc(NULL, NULL, rtcp_buf, rtcp_len, 1, medium->ssrc, medium->ssrc_peer[0]); } /* Queue this packet */ @@ -5079,8 +5398,10 @@ void janus_ice_relay_rtcp_internal(janus_ice_handle *handle, janus_ice_peerconne pkt->control = TRUE; pkt->encrypted = FALSE; pkt->retransmission = FALSE; + pkt->probing = FALSE; pkt->label = NULL; pkt->protocol = NULL; + pkt->twcc_seq = 0; pkt->added = janus_get_monotonic_time(); janus_ice_queue_packet(handle, pkt); if(rtcp_buf != packet->buffer) { @@ -5115,7 +5436,7 @@ void janus_ice_relay_rtcp(janus_ice_handle *handle, janus_plugin_rtcp *packet) { char plibuf[12]; memset(plibuf, 0, 12); janus_rtcp_pli((char *)&plibuf, 12); - janus_rtcp_fix_ssrc(NULL, plibuf, sizeof(plibuf), 1, + janus_rtcp_fix_ssrc(NULL, NULL, plibuf, sizeof(plibuf), 1, medium->ssrc, medium->ssrc_peer[1]); janus_plugin_rtcp rtcp = { .mindex = medium->mindex, .video = TRUE, .buffer = plibuf, .length = sizeof(plibuf) }; janus_ice_relay_rtcp_internal(handle, medium, &rtcp, FALSE); @@ -5124,7 +5445,7 @@ void janus_ice_relay_rtcp(janus_ice_handle *handle, janus_plugin_rtcp *packet) { char plibuf[12]; memset(plibuf, 0, 12); janus_rtcp_pli((char *)&plibuf, 12); - janus_rtcp_fix_ssrc(NULL, plibuf, sizeof(plibuf), 1, + janus_rtcp_fix_ssrc(NULL, NULL, plibuf, sizeof(plibuf), 1, medium->ssrc, medium->ssrc_peer[2]); janus_plugin_rtcp rtcp = { .mindex = medium->mindex, .video = TRUE, .buffer = plibuf, .length = sizeof(plibuf) }; janus_ice_relay_rtcp_internal(handle, medium, &rtcp, FALSE); @@ -5178,8 +5499,10 @@ void janus_ice_relay_data(janus_ice_handle *handle, janus_plugin_data *packet) { pkt->control = FALSE; pkt->encrypted = FALSE; pkt->retransmission = FALSE; + pkt->probing = FALSE; pkt->label = packet->label ? g_strdup(packet->label) : NULL; pkt->protocol = packet->protocol ? g_strdup(packet->protocol) : NULL; + pkt->twcc_seq = 0; pkt->added = janus_get_monotonic_time(); janus_ice_queue_packet(handle, pkt); } @@ -5205,8 +5528,10 @@ void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length) { pkt->control = FALSE; pkt->encrypted = FALSE; pkt->retransmission = FALSE; + pkt->probing = FALSE; pkt->label = NULL; pkt->protocol = NULL; + pkt->twcc_seq = 0; pkt->added = janus_get_monotonic_time(); janus_ice_queue_packet(handle, pkt); #endif diff --git a/src/ice.h b/src/ice.h index 216dcb4d61..a50c59c30a 100644 --- a/src/ice.h +++ b/src/ice.h @@ -28,6 +28,7 @@ #include "dtls.h" #include "sctp.h" #include "rtcp.h" +#include "bwe.h" #include "text2pcap.h" #include "utils.h" #include "ip-utils.h" @@ -378,8 +379,8 @@ struct janus_ice_handle { void *static_event_loop; /*! \brief GLib thread for the handle and libnice */ GThread *thread; - /*! \brief GLib sources for outgoing traffic, recurring RTCP, and stats (and optionally TWCC) */ - GSource *rtp_source, *rtcp_source, *stats_source, *twcc_source; + /*! \brief GLib sources for outgoing traffic, recurring RTCP, and stats (and optionally TWCC/BWE/probing) */ + GSource *rtp_source, *rtcp_source, *stats_source, *twcc_source, *bwe_source; /*! \brief libnice ICE agent */ NiceAgent *agent; /*! \brief Monotonic time of when the ICE agent has been created */ @@ -414,6 +415,14 @@ struct janus_ice_handle { gint last_srtp_error, last_srtp_summary; /*! \brief Count of how many seconds passed since the last stats passed to event handlers */ gint last_event_stats; + /*! \brief Bandwidth estimation target, if any, as asked by the plugin */ + uint32_t bwe_target; + /*! \brief In case offline BWE debugging is enabled, the CSV file to save to */ + char *bwe_csv; + /*! \brief In case live BWE debugging is enabled, the host to send stats to */ + char *bwe_host; + /*! \brief In case live BWE debugging is enabled, the port to send stats to */ + uint16_t bwe_port; /*! \brief Flag to decide whether or not packets need to be dumped to a text2pcap file */ volatile gint dump_packets; /*! \brief In case this session must be saved to text2pcap, the instance to dump packets to */ @@ -531,6 +540,8 @@ struct janus_ice_peerconnection { GHashTable *rtx_payload_types_rev; /*! \brief Helper flag to avoid flooding the console with the same error all over again */ gboolean noerrorlog; + /*! \brief Bandwidth estimation context */ + janus_bwe_context *bwe; /*! \brief Mutex to lock/unlock this stream */ janus_mutex mutex; /*! \brief Atomic flag to check if this instance has been destroyed */ @@ -602,8 +613,12 @@ struct janus_ice_peerconnection_medium { gint64 last_ntp_ts; /*! \brief Last sent RTP timestamp */ guint32 last_rtp_ts; + /*! \brief Last sent RTP sequence number */ + guint16 last_rtp_seqnum; /*! \brief Whether we should do NACKs (in or out) for this medium */ gboolean do_nacks; + /*! \brief Whether we should do Transport Wide CC for this medium */ + gboolean do_twcc; /*! \brief List of previously sent janus_rtp_packet RTP packets, in case we receive NACKs */ GQueue *retransmit_buffer; /*! \brief HashTable of retransmittable sequence numbers, in case we receive NACKs */ @@ -788,6 +803,19 @@ void janus_ice_restart(janus_ice_handle *handle); /*! \brief Method to resend all the existing candidates via trickle (e.g., after an ICE restart) * @param[in] handle The Janus ICE handle this method refers to */ void janus_ice_resend_trickles(janus_ice_handle *handle); +/*! \brief Method to dynamically enable bandwidth estimation for a handle + * @param[in] handle The Janus ICE handle this method refers to */ +void janus_ice_handle_enable_bwe(janus_ice_handle *handle); +/*! \brief Method to dynamically tweak the bandwidth estimation target for a handle (for probing) + * @param[in] handle The Janus ICE handle this method refers to + * @param[in] bitrate The bitrate to target (will be used to generate probing) */ +void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate); +/*! \brief Method to dynamically tweak the bandwidth estimation debugging for a handle + * @param[in] handle The Janus ICE handle this method refers to */ +void janus_ice_handle_debug_bwe(janus_ice_handle *handle); +/*! \brief Method to dynamically disable bandwidth estimation for a handle + * @param[in] handle The Janus ICE handle this method refers to */ +void janus_ice_handle_disable_bwe(janus_ice_handle *handle); ///@} diff --git a/src/janus.c b/src/janus.c index ec1e43325a..5ece3402d1 100644 --- a/src/janus.c +++ b/src/janus.c @@ -189,6 +189,13 @@ static struct janus_json_parameter text2pcap_parameters[] = { {"filename", JSON_STRING, 0}, {"truncate", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} }; +static struct janus_json_parameter debug_bwe_parameters[] = { + {"csv", JANUS_JSON_BOOL, 0}, + {"path", JSON_STRING, 0}, + {"live", JANUS_JSON_BOOL, 0}, + {"host", JSON_STRING, 0}, + {"port", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} +}; static struct janus_json_parameter handleinfo_parameters[] = { {"plugin_only", JANUS_JSON_BOOL, 0} }; @@ -612,6 +619,9 @@ void janus_plugin_relay_data(janus_plugin_session *plugin_session, janus_plugin_ void janus_plugin_send_pli(janus_plugin_session *plugin_session); void janus_plugin_send_pli_stream(janus_plugin_session *plugin_session, int mindex); void janus_plugin_send_remb(janus_plugin_session *plugin_session, uint32_t bitrate); +void janus_plugin_enable_bwe(janus_plugin_session *plugin_session); +void janus_plugin_set_bwe_target(janus_plugin_session *plugin_session, uint32_t bitrate); +void janus_plugin_disable_bwe(janus_plugin_session *plugin_session); void janus_plugin_close_pc(janus_plugin_session *plugin_session); void janus_plugin_end_session(janus_plugin_session *plugin_session); void janus_plugin_notify_event(janus_plugin *plugin, janus_plugin_session *plugin_session, json_t *event); @@ -627,6 +637,9 @@ static janus_callbacks janus_handler_plugin = .send_pli = janus_plugin_send_pli, .send_pli_stream = janus_plugin_send_pli_stream, .send_remb = janus_plugin_send_remb, + .enable_bwe = janus_plugin_enable_bwe, + .set_bwe_target = janus_plugin_set_bwe_target, + .disable_bwe = janus_plugin_disable_bwe, .close_pc = janus_plugin_close_pc, .end_session = janus_plugin_end_session, .events_is_enabled = janus_events_is_enabled, @@ -3007,8 +3020,88 @@ int janus_process_incoming_admin_request(janus_request *request) { /* Send the success reply */ ret = janus_process_success(request, reply); goto jsondone; + } else if(!strcasecmp(message_text, "debug_bwe")) { + /* Enable or disable BWE debugging (to CSV and/or external UDP address) */ + JANUS_VALIDATE_JSON_OBJECT(root, debug_bwe_parameters, + error_code, error_cause, FALSE, + JANUS_ERROR_MISSING_MANDATORY_ELEMENT, JANUS_ERROR_INVALID_ELEMENT_TYPE); + json_t *csv = json_object_get(root, "csv"); + json_t *live = json_object_get(root, "live"); + gboolean changes = FALSE; + janus_mutex_lock(&handle->mutex); + if(json_is_true(csv)) { + /* Enable CSV offline debugging */ + if(handle->bwe_csv) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNKNOWN, + "CSV debugging already configured"); + goto jsondone; + } + json_t *path = json_object_get(root, "path"); + const char *path_value = json_string_value(path); + if(path_value == NULL) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, + "Missing or invalid element (path)"); + goto jsondone; + } + changes = TRUE; + handle->bwe_csv = g_strdup(path_value); + } else { + /* Disable CSV offline debugging */ + if(handle->bwe_csv) + changes = TRUE; + g_free(handle->bwe_csv); + handle->bwe_csv = NULL; + } + if(json_is_true(live)) { + /* Enable live debugging */ + if(handle->bwe_host) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNKNOWN, + "Live debugging already configured"); + goto jsondone; + } + json_t *host = json_object_get(root, "host"); + json_t *port = json_object_get(root, "port"); + const char *host_value = json_string_value(host); + uint16_t port_value = json_integer_value(port); + if(host_value == NULL) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, + "Missing or invalid element (host)"); + goto jsondone; + } + if(port_value == 0) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, + "Missing or invalid element (port)"); + goto jsondone; + } + changes = TRUE; + handle->bwe_host = g_strdup(host_value); + handle->bwe_port = port_value; + } else { + /* Disable live debugging */ + if(handle->bwe_host) + changes = TRUE; + g_free(handle->bwe_host); + handle->bwe_host = NULL; + handle->bwe_port = 0; + } + janus_mutex_unlock(&handle->mutex); + /* Apply the changes, if needed */ + if(changes) + janus_ice_handle_debug_bwe(handle); + /* Prepare JSON reply */ + json_t *reply = json_object(); + json_object_set_new(reply, "janus", json_string("success")); + json_object_set_new(reply, "transaction", json_string(transaction_text)); + /* Send the success reply */ + ret = janus_process_success(request, reply); + goto jsondone; } - /* If this is not a request to start/stop debugging to text2pcap, it must be a handle_info */ + /* If we git here, it must be a handle_info */ if(strcasecmp(message_text, "handle_info")) { ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text); goto jsondone; @@ -3284,6 +3377,10 @@ json_t *janus_admin_peerconnection_summary(janus_ice_peerconnection *pc) { json_object_set_new(bwe, "twcc", pc->do_transport_wide_cc ? json_true() : json_false()); if(pc->transport_wide_cc_ext_id >= 0) json_object_set_new(bwe, "twcc-ext-id", json_integer(pc->transport_wide_cc_ext_id)); + if(pc->bwe) { + json_object_set_new(bwe, "estimate", json_integer(pc->bwe->estimate)); + json_object_set_new(bwe, "status", json_string(janus_bwe_status_description(pc->bwe->status))); + } json_object_set_new(w, "bwe", bwe); json_t *media = json_object(); /* Iterate on all media */ @@ -3329,6 +3426,7 @@ json_t *janus_admin_peerconnection_medium_summary(janus_ice_peerconnection_mediu if(medium->type != JANUS_MEDIA_DATA) { json_object_set_new(m, "do_nacks", medium->do_nacks ? json_true() : json_false()); json_object_set_new(m, "nack-queue-ms", json_integer(medium->nack_queue_ms)); + json_object_set_new(m, "do_twcc", medium->do_twcc ? json_true() : json_false()); } if(medium->type != JANUS_MEDIA_DATA) { json_t *ms = json_object(); @@ -4276,6 +4374,30 @@ void janus_plugin_send_remb(janus_plugin_session *plugin_session, uint32_t bitra janus_ice_send_remb(handle, bitrate); } +void janus_plugin_enable_bwe(janus_plugin_session *plugin_session) { + janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle; + if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) + || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) + return; + janus_ice_handle_enable_bwe(handle); +} + +void janus_plugin_set_bwe_target(janus_plugin_session *plugin_session, uint32_t bitrate) { + janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle; + if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) + || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) + return; + janus_ice_handle_set_bwe_target(handle, bitrate); +} + +void janus_plugin_disable_bwe(janus_plugin_session *plugin_session) { + janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle; + if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) + || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) + return; + janus_ice_handle_disable_bwe(handle); +} + static gboolean janus_plugin_close_pc_internal(gpointer user_data) { /* We actually enforce the close_pc here */ janus_plugin_session *plugin_session = (janus_plugin_session *) user_data; diff --git a/src/plugins/janus_echotest.c b/src/plugins/janus_echotest.c index 5e54c15125..5f8bd18a28 100644 --- a/src/plugins/janus_echotest.c +++ b/src/plugins/janus_echotest.c @@ -125,6 +125,7 @@ #include "../rtcp.h" #include "../sdp-utils.h" #include "../utils.h" +#include "../bwe.h" /* Plugin information */ @@ -155,6 +156,7 @@ void janus_echotest_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtc void janus_echotest_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet); void janus_echotest_data_ready(janus_plugin_session *handle); void janus_echotest_slow_link(janus_plugin_session *handle, int mindex, gboolean video, gboolean uplink); +void janus_echotest_estimated_bandwidth(janus_plugin_session *handle, uint32_t estimate); void janus_echotest_hangup_media(janus_plugin_session *handle); void janus_echotest_destroy_session(janus_plugin_session *handle, int *error); json_t *janus_echotest_query_session(janus_plugin_session *handle); @@ -182,6 +184,7 @@ static janus_plugin janus_echotest_plugin = .incoming_data = janus_echotest_incoming_data, .data_ready = janus_echotest_data_ready, .slow_link = janus_echotest_slow_link, + .estimated_bandwidth = janus_echotest_estimated_bandwidth, .hangup_media = janus_echotest_hangup_media, .destroy_session = janus_echotest_destroy_session, .query_session = janus_echotest_query_session, @@ -253,6 +256,7 @@ typedef struct janus_echotest_session { janus_vp8_simulcast_context vp8_context; gboolean svc; janus_rtp_svc_context svc_context; + janus_bwe_stream_bitrate *ab, *vb; janus_recorder *arc; /* The Janus recorder instance for this user's audio, if enabled */ janus_recorder *vrc; /* The Janus recorder instance for this user's video, if enabled */ janus_recorder *drc; /* The Janus recorder instance for this user's data, if enabled */ @@ -525,6 +529,18 @@ json_t *janus_echotest_query_session(janus_plugin_session *handle) { json_object_set_new(info, "temporal-layer", json_integer(session->svc_context.temporal)); json_object_set_new(info, "temporal-layer-target", json_integer(session->svc_context.temporal_target)); } + if(session->ab && session->ab->packets[0]) + json_object_set_new(info, "audio-bitrate", json_integer(session->ab->bitrate[0])); + if(session->vb) { + int i=0; + for(i=0; i<9; i++) { + if(session->vb->packets[i]) { + char name[20]; + g_snprintf(name, sizeof(name), "video-bitrate-%d", i); + json_object_set_new(info, name, json_integer(session->vb->bitrate[i])); + } + } + } if(session->arc || session->vrc || session->drc) { json_t *recording = json_object(); if(session->arc && session->arc->filename) @@ -588,6 +604,8 @@ void janus_echotest_setup_media(janus_plugin_session *handle) { janus_mutex_unlock(&sessions_mutex); return; } + session->ab = janus_bwe_stream_bitrate_create(); + session->vb = janus_bwe_stream_bitrate_create(); g_atomic_int_set(&session->hangingup, 0); janus_mutex_unlock(&sessions_mutex); /* We really don't care, as we only send RTP/RTCP we get in the first place back anyway */ @@ -626,6 +644,8 @@ void janus_echotest_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp relay = janus_rtp_simulcasting_context_process_rtp(&session->sim_context, buf, len, packet->extensions.dd_content, packet->extensions.dd_len, session->ssrc, session->rid, session->vcodec, &session->context, &session->rid_mutex); + janus_bwe_stream_bitrate_update(session->vb, janus_get_monotonic_time(), + session->sim_context.substream_last, session->sim_context.temporal_last, len); } else { /* Process this SVC packet: don't relay if it's not the layer we wanted to handle */ relay = janus_rtp_svc_context_process_rtp(&session->svc_context, @@ -695,6 +715,11 @@ void janus_echotest_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp if((!video && session->audio_active) || (video && session->video_active)) { /* Save the frame if we're recording */ janus_recorder_save_frame(video ? session->vrc : session->arc, buf, len); + /* Update the bitrate counter */ + if(video) + janus_bwe_stream_bitrate_update(session->vb, janus_get_monotonic_time(), 0, 0, len); + else + janus_bwe_stream_bitrate_update(session->ab, janus_get_monotonic_time(), 0, 0, len); /* Send the frame back */ gateway->relay_rtp(handle, packet); } @@ -831,6 +856,28 @@ void janus_echotest_slow_link(janus_plugin_session *handle, int mindex, gboolean janus_refcount_decrease(&session->ref); } +void janus_echotest_estimated_bandwidth(janus_plugin_session *handle, uint32_t estimate) { + /* The core is informing us that our peer got or sent too many NACKs, are we pushing media too hard? */ + if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) + return; + janus_mutex_lock(&sessions_mutex); + janus_echotest_session *session = janus_echotest_lookup_session(handle); + if(!session) { + janus_mutex_unlock(&sessions_mutex); + JANUS_LOG(LOG_ERR, "No session associated with this handle...\n"); + return; + } + if(g_atomic_int_get(&session->destroyed)) { + janus_mutex_unlock(&sessions_mutex); + return; + } + janus_refcount_increase(&session->ref); + janus_mutex_unlock(&sessions_mutex); + /* TODO Actually use this estimate for something */ + JANUS_LOG(LOG_WARN, "[echotest] BWE=%"SCNu32"\n", estimate); + janus_refcount_decrease(&session->ref); +} + static void janus_echotest_recorder_close(janus_echotest_session *session) { if(session->arc) { janus_recorder *rc = session->arc; @@ -903,6 +950,10 @@ static void janus_echotest_hangup_media_internal(janus_plugin_session *handle) { janus_rtp_switching_context_reset(&session->context); janus_rtp_simulcasting_context_reset(&session->sim_context); janus_vp8_simulcast_context_reset(&session->vp8_context); + janus_bwe_stream_bitrate_destroy(session->ab); + session->ab = NULL; + janus_bwe_stream_bitrate_destroy(session->vb); + session->vb = NULL; session->min_delay = -1; session->max_delay = -1; g_atomic_int_set(&session->hangingup, 0); diff --git a/src/plugins/janus_nosip.c b/src/plugins/janus_nosip.c index 8bf82e8dad..26e0ed6b81 100644 --- a/src/plugins/janus_nosip.c +++ b/src/plugins/janus_nosip.c @@ -1194,7 +1194,7 @@ void janus_nosip_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp * session, video ? "video" : "audio", (video ? session->media.video_ssrc : session->media.audio_ssrc), (video ? session->media.video_ssrc_peer : session->media.audio_ssrc_peer)); - janus_rtcp_fix_ssrc(NULL, (char *)buf, len, video, + janus_rtcp_fix_ssrc(NULL, NULL, (char *)buf, len, video, (video ? session->media.video_ssrc : session->media.audio_ssrc), (video ? session->media.video_ssrc_peer : session->media.audio_ssrc_peer)); /* Is SRTP involved? */ diff --git a/src/plugins/janus_sip.c b/src/plugins/janus_sip.c index 0ee79b0d65..e89189daaa 100644 --- a/src/plugins/janus_sip.c +++ b/src/plugins/janus_sip.c @@ -2618,7 +2618,7 @@ void janus_sip_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *pa /* Fix SSRCs as the Janus core does */ JANUS_LOG(LOG_HUGE, "[SIP] Fixing SSRCs (local %u, peer %u)\n", session->media.video_ssrc, session->media.video_ssrc_peer); - janus_rtcp_fix_ssrc(NULL, (char *)buf, len, 1, session->media.video_ssrc, session->media.video_ssrc_peer); + janus_rtcp_fix_ssrc(NULL, NULL, (char *)buf, len, 1, session->media.video_ssrc, session->media.video_ssrc_peer); /* Is SRTP involved? */ if(session->media.has_srtp_local_video) { char sbuf[2048]; @@ -2648,7 +2648,7 @@ void janus_sip_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *pa /* Fix SSRCs as the Janus core does */ JANUS_LOG(LOG_HUGE, "[SIP] Fixing SSRCs (local %u, peer %u)\n", session->media.audio_ssrc, session->media.audio_ssrc_peer); - janus_rtcp_fix_ssrc(NULL, (char *)buf, len, 1, session->media.audio_ssrc, session->media.audio_ssrc_peer); + janus_rtcp_fix_ssrc(NULL, NULL, (char *)buf, len, 1, session->media.audio_ssrc, session->media.audio_ssrc_peer); /* Is SRTP involved? */ if(session->media.has_srtp_local_audio) { char sbuf[2048]; @@ -7532,7 +7532,7 @@ static void janus_sip_rtcp_pli_send(janus_sip_session *session) { /* Fix SSRCs as the Janus core does */ JANUS_LOG(LOG_HUGE, "[SIP] Fixing SSRCs (local %u, peer %u)\n", session->media.video_ssrc, session->media.video_ssrc_peer); - janus_rtcp_fix_ssrc(NULL, (char *)rtcp_buf, rtcp_len, 1, session->media.video_ssrc, session->media.video_ssrc_peer); + janus_rtcp_fix_ssrc(NULL, NULL, (char *)rtcp_buf, rtcp_len, 1, session->media.video_ssrc, session->media.video_ssrc_peer); /* Is SRTP involved? */ if(session->media.has_srtp_local_video) { char sbuf[50]; diff --git a/src/plugins/janus_streaming.c b/src/plugins/janus_streaming.c index 23213f7972..f1398ae4a9 100644 --- a/src/plugins/janus_streaming.c +++ b/src/plugins/janus_streaming.c @@ -1839,7 +1839,7 @@ static void janus_streaming_rtcp_pli_send(janus_streaming_rtp_source_stream *str char rtcp_buf[12]; int rtcp_len = 12; janus_rtcp_pli((char *)&rtcp_buf, rtcp_len); - janus_rtcp_fix_ssrc(NULL, rtcp_buf, rtcp_len, 1, 1, stream->ssrc); + janus_rtcp_fix_ssrc(NULL, NULL, rtcp_buf, rtcp_len, 1, 1, stream->ssrc); /* Send the packet */ socklen_t addrlen = stream->rtcp_addr.ss_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); int sent = 0; @@ -1862,7 +1862,7 @@ static void janus_streaming_rtcp_remb_send(janus_streaming_rtp_source *source, j char rtcp_buf[24]; int rtcp_len = 24; janus_rtcp_remb((char *)(&rtcp_buf), rtcp_len, source->lowest_bitrate); - janus_rtcp_fix_ssrc(NULL, rtcp_buf, rtcp_len, 1, 1, stream->ssrc); + janus_rtcp_fix_ssrc(NULL, NULL, rtcp_buf, rtcp_len, 1, 1, stream->ssrc); JANUS_LOG(LOG_HUGE, "Sending REMB: %"SCNu32"\n", source->lowest_bitrate); /* Reset the lowest bitrate */ source->lowest_bitrate = 0; @@ -6213,6 +6213,7 @@ static void *janus_streaming_handler(void *data) { (session->abscapturetime_src_ext_id > 0 ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_ABS_CAPTURE_TIME) : 0), JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_PLAYOUT_DELAY, (session->playoutdelay_ext ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_PLAYOUT_DELAY) : 0), + JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_TRANSPORT_WIDE_CC, janus_rtp_extension_id(JANUS_RTP_EXTMAP_TRANSPORT_WIDE_CC), JANUS_SDP_OA_DONE); } else { /* Iterate on all media streams */ diff --git a/src/plugins/janus_videoroom.c b/src/plugins/janus_videoroom.c index 217fee7bde..c1c7f115d3 100644 --- a/src/plugins/janus_videoroom.c +++ b/src/plugins/janus_videoroom.c @@ -1559,6 +1559,7 @@ void janus_videoroom_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rt void janus_videoroom_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet); void janus_videoroom_data_ready(janus_plugin_session *handle); void janus_videoroom_slow_link(janus_plugin_session *handle, int mindex, gboolean video, gboolean uplink); +void janus_videoroom_estimated_bandwidth(janus_plugin_session *handle, uint32_t estimate); void janus_videoroom_hangup_media(janus_plugin_session *handle); void janus_videoroom_destroy_session(janus_plugin_session *handle, int *error); json_t *janus_videoroom_query_session(janus_plugin_session *handle); @@ -1586,6 +1587,7 @@ static janus_plugin janus_videoroom_plugin = .incoming_data = janus_videoroom_incoming_data, .data_ready = janus_videoroom_data_ready, .slow_link = janus_videoroom_slow_link, + .estimated_bandwidth = janus_videoroom_estimated_bandwidth, .hangup_media = janus_videoroom_hangup_media, .destroy_session = janus_videoroom_destroy_session, .query_session = janus_videoroom_query_session, @@ -2143,6 +2145,8 @@ typedef struct janus_videoroom_publisher_stream { char *rid[3]; /* Only needed if simulcasting is rid-based */ int rid_extmap_id; /* rid extmap ID */ janus_mutex rid_mutex; /* Mutex to protect access to the rid array and the extmap ID */ + /* Tracker of the bitrate(s) of this stream, for BWE purposes */ + janus_bwe_stream_bitrate *bitrates; /* RTP extensions, if negotiated */ guint8 audio_level_extmap_id; /* Audio level extmap ID */ guint8 video_orient_extmap_id; /* Video orientation extmap ID */ @@ -2242,6 +2246,8 @@ typedef struct janus_videoroom_subscriber_stream { janus_vp8_simulcast_context vp8_context; /* SVC context */ janus_rtp_svc_context svc_context; + /* When we last dropped a layer because of BWE (to implement a cooldown period) */ + gint64 last_bwe_drop; /* Playout delays to enforce when relaying this stream, if the extension has been negotiated */ int16_t min_delay, max_delay; volatile gint ready, destroyed; @@ -2367,6 +2373,7 @@ static void janus_videoroom_publisher_stream_free(const janus_refcount *ps_ref) g_free(ps->h264_profile); g_free(ps->vp9_profile); janus_recorder_destroy(ps->rc); + janus_bwe_stream_bitrate_destroy(ps->bitrates); g_hash_table_destroy(ps->rtp_forwarders); ps->rtp_forwarders = NULL; janus_mutex_destroy(&ps->rtp_forwarders_mutex); @@ -2595,7 +2602,7 @@ static void janus_videoroom_rtcp_pli_send(janus_videoroom_publisher_stream *ps) int rtcp_len = 12; janus_rtcp_pli((char *)&rtcp_buf, rtcp_len); uint32_t ssrc = REMOTE_PUBLISHER_BASE_SSRC + (ps->mindex*REMOTE_PUBLISHER_SSRC_STEP); - janus_rtcp_fix_ssrc(NULL, rtcp_buf, rtcp_len, 1, 1, ssrc); + janus_rtcp_fix_ssrc(NULL, NULL, rtcp_buf, rtcp_len, 1, 1, ssrc); /* Send the packet */ socklen_t addrlen = publisher->rtcp_addr.ss_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); int sent = 0; @@ -3198,7 +3205,7 @@ static json_t *janus_videoroom_subscriber_offer(janus_videoroom_subscriber *subs JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_PLAYOUT_DELAY, (stream->type == JANUS_VIDEOROOM_MEDIA_VIDEO && (ps && ps->playout_delay_extmap_id > 0)) ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_PLAYOUT_DELAY) : 0, JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_TRANSPORT_WIDE_CC, - (stream->type == JANUS_VIDEOROOM_MEDIA_VIDEO && subscriber->room->transport_wide_cc_ext) ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_TRANSPORT_WIDE_CC) : 0, + (subscriber->room->transport_wide_cc_ext) ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_TRANSPORT_WIDE_CC) : 0, JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_ABS_SEND_TIME, (stream->type == JANUS_VIDEOROOM_MEDIA_VIDEO) ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_ABS_SEND_TIME) : 0, /* TODO Add other properties from original SDP */ @@ -4113,6 +4120,21 @@ json_t *janus_videoroom_query_session(janus_plugin_session *handle) { json_object_set_new(m, "audio-level-dBov", json_integer(ps->audio_dBov_level)); json_object_set_new(m, "talking", ps->talking ? json_true() : json_false()); } + if(ps->bitrates) { + int i=0; + json_t *bitrates = NULL; + for(i=0; i<9; i++) { + if(ps->bitrates->packets[i]) { + if(bitrates == NULL) + bitrates = json_object(); + char name[2]; + g_snprintf(name, sizeof(name), "%d", i); + json_object_set_new(bitrates, name, json_integer(ps->bitrates->bitrate[i])); + } + } + if(bitrates != NULL) + json_object_set_new(m, "bitrates", bitrates); + } janus_mutex_lock(&ps->subscribers_mutex); json_object_set_new(m, "subscribers", json_integer(g_slist_length(ps->subscribers))); janus_mutex_unlock(&ps->subscribers_mutex); @@ -7226,6 +7248,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi ps->vcodec = JANUS_VIDEOCODEC_NONE; ps->min_delay = -1; ps->max_delay = -1; + ps->bitrates = janus_bwe_stream_bitrate_create(); if(mtype == JANUS_VIDEOROOM_MEDIA_AUDIO) { ps->acodec = janus_audiocodec_from_name(codec); ps->pt = janus_audiocodec_pt(ps->acodec); @@ -7502,6 +7525,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi ps->vcodec = JANUS_VIDEOCODEC_NONE; ps->min_delay = -1; ps->max_delay = -1; + ps->bitrates = janus_bwe_stream_bitrate_create(); if(mtype == JANUS_VIDEOROOM_MEDIA_AUDIO) { ps->acodec = janus_audiocodec_from_name(codec); ps->pt = janus_audiocodec_pt(ps->acodec); @@ -7872,6 +7896,8 @@ void janus_videoroom_setup_media(janus_plugin_session *handle) { } else if(session->participant_type == janus_videoroom_p_type_subscriber) { janus_videoroom_subscriber *s = janus_videoroom_session_get_subscriber(session); if(s && s->streams) { + /* FIXME Enable bandwidth estimation for this session */ + gateway->enable_bwe(session->handle); /* Send a PLI for all the video streams we subscribed to */ GList *temp = s->streams; while(temp) { @@ -8039,11 +8065,14 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi /* Save the frame if we're recording */ if(!video || !ps->simulcast) { janus_recorder_save_frame(ps->rc, buf, len); + janus_bwe_stream_bitrate_update(ps->bitrates, janus_get_monotonic_time(), 0, 0, len); } else { /* We're simulcasting, save the best video quality */ gboolean save = janus_rtp_simulcasting_context_process_rtp(&ps->rec_simctx, buf, len, pkt->extensions.dd_content, pkt->extensions.dd_len, ps->vssrc, ps->rid, ps->vcodec, &ps->rec_ctx, &ps->rid_mutex); + janus_bwe_stream_bitrate_update(ps->bitrates, janus_get_monotonic_time(), + ps->rec_simctx.substream_last, ps->rec_simctx.temporal_last, len); if(save) { uint32_t seq_number = ntohs(rtp->seq_number); uint32_t timestamp = ntohl(rtp->timestamp); @@ -8397,6 +8426,161 @@ void janus_videoroom_slow_link(janus_plugin_session *handle, int mindex, gboolea janus_refcount_decrease(&session->ref); } +void janus_videoroom_estimated_bandwidth(janus_plugin_session *handle, uint32_t estimate) { + /* The core is informing us that our peer got or sent too many NACKs, are we pushing media too hard? */ + if(handle == NULL || g_atomic_int_get(&handle->stopped) || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) + return; + janus_mutex_lock(&sessions_mutex); + janus_videoroom_session *session = janus_videoroom_lookup_session(handle); + if(!session || g_atomic_int_get(&session->destroyed) || !session->participant) { + janus_mutex_unlock(&sessions_mutex); + return; + } + janus_refcount_increase(&session->ref); + janus_mutex_unlock(&sessions_mutex); + /* This will only make sense for subscribers, publishers have their own BWE */ + if(session->participant_type == janus_videoroom_p_type_subscriber) { + janus_videoroom_subscriber *subscriber = janus_videoroom_session_get_subscriber(session); + if(subscriber == NULL) { + janus_refcount_decrease(&session->ref); + return; + } + if(g_atomic_int_get(&subscriber->destroyed)) { + janus_refcount_decrease(&subscriber->ref); + janus_refcount_decrease(&session->ref); + return; + } + JANUS_LOG(LOG_WARN, "[videoroom] BWE=%"SCNu32"\n", estimate); + janus_mutex_lock(&subscriber->streams_mutex); + GList *temp = subscriber->streams; + gint64 now = janus_get_monotonic_time(); + while(temp) { + janus_videoroom_subscriber_stream *s = (janus_videoroom_subscriber_stream *)temp->data; + if(s->type == JANUS_VIDEOROOM_MEDIA_DATA) { + temp = temp->next; + continue; + } + if(s->last_bwe_drop > 0 && s->last_bwe_drop < now) + s->last_bwe_drop = 0; + GSList *list = s->publisher_streams; + if(list) { + janus_videoroom_publisher_stream *ps = list->data; + if(ps && ps->publisher != NULL && ps->bitrates) { + /* FIXME Check the bandwidth usage for this stream */ + if(s->type == JANUS_VIDEOROOM_MEDIA_AUDIO && ps->bitrates->packets[0]) { + if(estimate < ps->bitrates->bitrate[0]) { + estimate = 0; + JANUS_LOG(LOG_WARN, "Insufficient bandwidth for stream %d (audio): %"SCNu32" < %"SCNu32"\n", + s->mindex, estimate, ps->bitrates->bitrate[0]); + } else { + //~ estimate -= ps->bitrates->bitrate[0]; + } + } else if(s->type == JANUS_VIDEOROOM_MEDIA_VIDEO) { + if(ps->simulcast) { + JANUS_LOG(LOG_DBG, "current=%d/%d, target=%d/%d, bwe=%d/%d\n", + s->sim_context.substream, s->sim_context.templayer, + s->sim_context.substream_target, s->sim_context.templayer_target, + s->sim_context.substream_target_bwe, s->sim_context.templayer_target_bwe); + /* FIXME Simulcast, check the current targets, and retarget if needed */ + int substream = (s->sim_context.substream_target_temp == -1) ? + s->sim_context.substream_target : s->sim_context.substream_target_temp; + if(substream < 0) + substream = 0; + int templayer = s->sim_context.templayer_target; + if(templayer < 0) + templayer = 0; + int target = 3*substream + templayer; + if(target > 8) + target = 8; + /* Check if the target is actually available */ + while(ps->bitrates->packets[target] == NULL) { + templayer--; + if(templayer < 0) { + substream--; + if(substream < 0) { + substream = 0; + } else { + templayer = 2; + } + } + target = 3*substream + templayer; + } + if(estimate < (ps->bitrates->bitrate[target] + 20000)) { + /* Unavailable layer, or we don't have room for these layers, find one below that fits */ + if(target == 0) { + estimate = 0; + JANUS_LOG(LOG_WARN, "Insufficient bandwidth for simulcast stream %d (video)\n", s->mindex); + } else { + gboolean found = FALSE; + int old_target = target; + int new_substream = 0, new_templayer = 0; + while(target > 0) { + target--; + new_substream = target/3; + new_templayer = target%3; + if(ps->bitrates->packets[target] && estimate >= ps->bitrates->bitrate[target]) { + /* If we're going up, make sure we're not in the cooldown period */ + if(s->sim_context.substream_target_bwe != -1 && + new_substream > s->sim_context.substream_target_bwe && + now < s->last_bwe_drop) { + /* We are, ignore this layer */ + continue; + } + found = TRUE; + break; + } + } + if(!found) { + estimate = 0; + JANUS_LOG(LOG_WARN, "Insufficient bandwidth for simulcast stream %d (video)\n", s->mindex); + } else { + if(s->sim_context.substream_target_bwe == -1 || s->sim_context.substream_target_bwe > new_substream || + s->sim_context.templayer_target_bwe == -1 || s->sim_context.templayer_target_bwe > new_templayer) { + JANUS_LOG(LOG_WARN, "Insufficient bandwidth for simulcast %d/%d of stream %d (%"SCNu32" < %"SCNu32"), switching to %d/%d (%"SCNu32")\n", + substream, templayer, s->mindex, estimate, ps->bitrates->bitrate[old_target], + new_substream, new_templayer, ps->bitrates->bitrate[target]); + /* If BWE made us go down, implement a cooldown period of about 5 seconds */ + if(s->sim_context.substream_target_bwe == -1 || + new_substream < s->sim_context.substream_target_bwe) + s->last_bwe_drop = now + 5*G_USEC_PER_SEC; + } + estimate -= ps->bitrates->bitrate[target]; + s->sim_context.substream_target_bwe = new_substream; + s->sim_context.templayer_target_bwe = new_templayer; + /* Request a PLI if needed */ + if(s->sim_context.substream_target_bwe != new_substream) + janus_videoroom_reqpli(ps, "Simulcasting substream change (BWE)"); + } + } + } else { + estimate -= ps->bitrates->bitrate[target]; + s->sim_context.substream_target_bwe = -1; + s->sim_context.templayer_target_bwe = -1; + } + } else if(ps->svc) { + /* TODO SVC */ + } else { + /* No simulcast, no SVC */ + if(ps->bitrates->packets[0]) { + if(estimate < ps->bitrates->bitrate[0]) { + estimate = 0; + JANUS_LOG(LOG_WARN, "Insufficient bandwidth for stream %d (video)\n", s->mindex); + } else { + estimate -= ps->bitrates->bitrate[0]; + } + } + } + } + } + } + temp = temp->next; + } + janus_mutex_unlock(&subscriber->streams_mutex); + janus_refcount_decrease(&subscriber->ref); + } + janus_refcount_decrease(&session->ref); +} + static void janus_videoroom_recorder_create(janus_videoroom_publisher_stream *ps) { char filename[255]; janus_recorder *rc = NULL; @@ -11849,6 +12033,7 @@ static void *janus_videoroom_handler(void *data) { ps->pt = -1; ps->min_delay = -1; ps->max_delay = -1; + ps->bitrates = janus_bwe_stream_bitrate_create(); g_atomic_int_set(&ps->destroyed, 0); janus_refcount_init(&ps->ref, janus_videoroom_publisher_stream_free); janus_refcount_increase(&ps->ref); /* This is for the mid-indexed hashtable */ @@ -12444,6 +12629,27 @@ static void janus_videoroom_relay_rtp_packet(gpointer data, gpointer user_data) gateway->push_event(subscriber->session->handle, &janus_videoroom_plugin, NULL, event, NULL); json_decref(event); } + if(stream->sim_context.changed_substream || stream->sim_context.changed_temporal) { + /* FIXME We changed substream, check if we need to reconfigure the bitrate target for probing */ + if(stream->sim_context.substream < stream->sim_context.substream_target || + (stream->sim_context.substream == stream->sim_context.substream_target && + stream->sim_context.templayer < stream->sim_context.templayer_target)) { + /* We're not receiving what we need, aim for a higher layer */ + uint8_t current_layer = stream->sim_context.substream*3 + stream->sim_context.templayer; + uint8_t target_layer = stream->sim_context.substream_target*3 + stream->sim_context.templayer_target; + uint32_t target = 0; + uint8_t i = 0; + JANUS_LOG(LOG_WARN, "[videoroom] current=%d, target=%d\n", current_layer, target_layer); + for(i = current_layer+1; i<=target_layer; i++) { + JANUS_LOG(LOG_WARN, "[videoroom] -- %d --> %"SCNu32"\n", i, (ps->bitrates->packets[i] ? ps->bitrates->bitrate[i] : 0)); + if(ps->bitrates->packets[i]) + target = ps->bitrates->bitrate[i]; + if(target > 0) + break; + } + gateway->set_bwe_target(subscriber->session->handle, target ? (target + 50000) : 0); + } + } /* If we got here, update the RTP header and send the packet */ janus_rtp_header_update(packet->data, &stream->context, TRUE, 0); char vp8pd[6]; diff --git a/src/plugins/plugin.h b/src/plugins/plugin.h index 18c4b9b3ad..b3cac0f0d4 100644 --- a/src/plugins/plugin.h +++ b/src/plugins/plugin.h @@ -208,6 +208,7 @@ static janus_plugin janus_echotest_plugin = .incoming_data = NULL, \ .data_ready = NULL, \ .slow_link = NULL, \ + .estimated_bandwidth = NULL, \ .hangup_media = NULL, \ .destroy_session = NULL, \ .query_session = NULL, \ @@ -335,6 +336,11 @@ struct janus_plugin { * @param[in] uplink Whether this is related to the uplink (Janus to peer) * or downlink (peer to Janus) */ void (* const slow_link)(janus_plugin_session *handle, int mindex, gboolean video, gboolean uplink); + /*! \brief Callback to be notified about the estimated outgoing bandwidth + * on this PeerConnection, e.g., for simulcast/SVC automated switches + * @param[in] handle The plugin/gateway session used for this peer + * @param[in] estimate The estimated bandwidth in bps */ + void (* const estimated_bandwidth)(janus_plugin_session *handle, uint32_t estimate); /*! \brief Callback to be notified about DTLS alerts from a peer (i.e., the PeerConnection is not valid any more) * @param[in] handle The plugin/gateway session used for this peer */ void (* const hangup_media)(janus_plugin_session *handle); @@ -399,6 +405,23 @@ struct janus_callbacks { * @param[in] bitrate The bitrate value to send in the REMB message */ void (* const send_remb)(janus_plugin_session *handle, guint32 bitrate); + /*! \brief Create a new bandwidth estimation context for this session + * \note A call to this method will result in the core invoking the + * estimated_bandwidth callback on a regular basis for this session + * @param[in] handle The plugin/gateway session to enable BWE for */ + void (* const enable_bwe)(janus_plugin_session *handle); + /*! \brief Configure the target bitrate in this session, to generate + * probing for bandwidth estimation purposes (0 disables probing) + * \note The request will be ignored if no BWE context is enabled for this session. + * Also notice that probing may be paused at any time by the core, whether it + * was enabled or not, e.g., in case congestion or losses are detected + * @param[in] handle The plugin/gateway session to enable BWE probing for + * @param[in] target The bitrate to target (e.g., next simulcast layer) */ + void (* const set_bwe_target)(janus_plugin_session *handle, uint32_t bitrate); + /*! \brief Get rid of the bandwidth estimation context for this session + * @param[in] handle The plugin/gateway session to disnable BWE for */ + void (* const disable_bwe)(janus_plugin_session *handle); + /*! \brief Callback to ask the core to close a WebRTC PeerConnection * \note A call to this method will result in the core invoking the hangup_media * callback on this plugin when done, but only if a PeerConnection had been diff --git a/src/rtcp.c b/src/rtcp.c index 637c65cf8e..fa20c1acad 100644 --- a/src/rtcp.c +++ b/src/rtcp.c @@ -23,25 +23,6 @@ #include "utils.h" -/* Transport CC statuses */ -typedef enum janus_rtp_packet_status { - janus_rtp_packet_status_notreceived = 0, - janus_rtp_packet_status_smalldelta = 1, - janus_rtp_packet_status_largeornegativedelta = 2, - janus_rtp_packet_status_reserved = 3 -} janus_rtp_packet_status; -static const char *janus_rtp_packet_status_description(janus_rtp_packet_status status) { - switch(status) { - case janus_rtp_packet_status_notreceived: return "notreceived"; - case janus_rtp_packet_status_smalldelta: return "smalldelta"; - case janus_rtp_packet_status_largeornegativedelta: return "largeornegativedelta"; - case janus_rtp_packet_status_reserved: return "reserved"; - default: break; - } - return NULL; -} - - gboolean janus_is_rtcp(char *buf, guint len) { if (len < 8) return FALSE; @@ -49,8 +30,8 @@ gboolean janus_is_rtcp(char *buf, guint len) { return ((header->type >= 64) && (header->type < 96)); } -int janus_rtcp_parse(janus_rtcp_context *ctx, char *packet, int len) { - return janus_rtcp_fix_ssrc(ctx, packet, len, 0, 0, 0); +int janus_rtcp_parse(janus_rtcp_context *ctx, janus_bwe_context *bwe, char *packet, int len) { + return janus_rtcp_fix_ssrc(ctx, bwe, packet, len, 0, 0, 0); } guint32 janus_rtcp_get_sender_ssrc(char *packet, int len) { @@ -226,9 +207,9 @@ static void janus_rtcp_incoming_sr(janus_rtcp_context *ctx, janus_rtcp_sr *sr) { ctx->lsr = (ntp >> 16); } -/* Helper to handle an incoming transport-cc feedback: triggered by a call to janus_rtcp_fix_ssrc a valid context pointer */ -static void janus_rtcp_incoming_transport_cc(janus_rtcp_context *ctx, janus_rtcp_fb *twcc, int total) { - if(ctx == NULL || twcc == NULL || total < 20) +/* Helper to handle an incoming transport-cc feedback: triggered by a call to janus_rtcp_fix_ssrc */ +static void janus_rtcp_incoming_transport_cc(janus_rtcp_context *ctx, janus_bwe_context *bwe, janus_rtcp_fb *twcc, int total) { + if(ctx == NULL || bwe == NULL || twcc == NULL || total < 20) return; if(!janus_rtcp_check_fci((janus_rtcp_header *)twcc, total, 4)) return; @@ -266,7 +247,7 @@ static void janus_rtcp_incoming_transport_cc(janus_rtcp_context *ctx, janus_rtcp s = (chunk & 0x6000) >> 13; length = (chunk & 0x1FFF); JANUS_LOG(LOG_HUGE, " [%"SCNu16"] t=run-length, s=%s, l=%"SCNu8"\n", num, - janus_rtp_packet_status_description(s), length); + janus_bwe_twcc_status_description(s), length); while(length > 0 && psc > 0) { list = g_list_prepend(list, GUINT_TO_POINTER(s)); length--; @@ -280,7 +261,7 @@ static void janus_rtcp_incoming_transport_cc(janus_rtcp_context *ctx, janus_rtcp ss ? "2-bit" : "bit", length); while(length > 0 && psc > 0) { if(!ss) - s = (chunk & (1 << (length-1))) ? janus_rtp_packet_status_smalldelta : janus_rtp_packet_status_notreceived; + s = (chunk & (1 << (length-1))) ? janus_bwe_twcc_status_smalldelta : janus_bwe_twcc_status_notreceived; else s = (chunk & (3 << (2*length-2))) >> (2*length-2); list = g_list_prepend(list, GUINT_TO_POINTER(s)); @@ -300,35 +281,44 @@ static void janus_rtcp_incoming_transport_cc(janus_rtcp_context *ctx, janus_rtcp /* Iterate on all recv deltas */ JANUS_LOG(LOG_HUGE, "[TWCC] Recv Deltas (%d/%"SCNu16"):\n", g_list_length(list), status_count); num = 0; - uint16_t delta = 0; - uint32_t delta_us = 0; + int16_t delta = 0, seq = 0; + int64_t delta_us = 0; GList *iter = list; + gboolean first_recv_found = FALSE; while(iter != NULL && total > 0) { num++; delta = 0; + seq = (uint16_t)(base_seq+num-1); + /* Get the delta */ s = GPOINTER_TO_UINT(iter->data); - if(s == janus_rtp_packet_status_smalldelta) { - /* Small delta = 1 byte */ - delta = *data; + if(s == janus_bwe_twcc_status_smalldelta) { + /* Small delta = unsigned 1 byte */ + delta = (int16_t)*data; total--; data++; - } else if(s == janus_rtp_packet_status_largeornegativedelta) { - /* Large or negative delta = 2 bytes */ + } else if(s == janus_bwe_twcc_status_largeornegativedelta) { + /* Large or negative delta = signed 2 bytes */ if(total < 2) break; - memcpy(&delta, data, sizeof(uint16_t)); + memcpy(&delta, data, sizeof(int16_t)); delta = ntohs(delta); total -= 2; data += 2; } delta_us = delta*250; - /* Print summary */ - JANUS_LOG(LOG_HUGE, " [%02"SCNu16"][%"SCNu16"] %s (%"SCNu32"us)\n", num, (uint16_t)(base_seq+num-1), - janus_rtp_packet_status_description(s), delta_us); + gboolean first = FALSE; + if(!first_recv_found && s != janus_bwe_twcc_status_notreceived) { + first_recv_found = TRUE; + first = TRUE; + } + /* Pass the feedback to the bandwidth estimation context */ + janus_bwe_context_handle_feedback(bwe, seq, s, delta_us, first); + /* Move to the next feedback */ iter = iter->next; } - /* TODO Update the context with the feedback we got */ g_list_free(list); + /* This callback is for updating the state of the bandwidth estimator */ + janus_bwe_context_update(bwe); } /* Link quality estimate filter coefficient */ @@ -520,7 +510,7 @@ gboolean janus_rtcp_check_remb(janus_rtcp_header *rtcp, int len) { return TRUE; } -int janus_rtcp_fix_ssrc(janus_rtcp_context *ctx, char *packet, int len, int fixssrc, uint32_t newssrcl, uint32_t newssrcr) { +int janus_rtcp_fix_ssrc(janus_rtcp_context *ctx, janus_bwe_context *bwe, char *packet, int len, int fixssrc, uint32_t newssrcl, uint32_t newssrcr) { if(packet == NULL || len <= 0) return -1; janus_rtcp_header *rtcp = (janus_rtcp_header *)packet; @@ -648,7 +638,7 @@ int janus_rtcp_fix_ssrc(janus_rtcp_context *ctx, char *packet, int len, int fixs } } else if(fmt == 15) { /* transport-cc */ /* If an RTCP context was provided, parse this transport-cc feedback */ - janus_rtcp_incoming_transport_cc(ctx, rtcpfb, total); + janus_rtcp_incoming_transport_cc(ctx, bwe, rtcpfb, total); } else { JANUS_LOG(LOG_HUGE, " #%d ??? -- RTPFB (205, fmt=%d)\n", pno, fmt); } @@ -1640,13 +1630,13 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr /* Store delta array */ GQueue *deltas = g_queue_new(); GQueue *statuses = g_queue_new(); - janus_rtp_packet_status last_status = janus_rtp_packet_status_reserved; - janus_rtp_packet_status max_status = janus_rtp_packet_status_notreceived; + janus_bwe_twcc_status last_status = janus_bwe_twcc_status_reserved; + janus_bwe_twcc_status max_status = janus_bwe_twcc_status_notreceived; gboolean all_same = TRUE; /* For each packet */ while (stat != NULL) { - janus_rtp_packet_status status = janus_rtp_packet_status_notreceived; + janus_bwe_twcc_status status = janus_bwe_twcc_status_notreceived; /* If got packet */ if (stat->timestamp) { @@ -1672,10 +1662,10 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr /* If it is negative or too big */ if (delta<0 || delta> 255) { /* Big one */ - status = janus_rtp_packet_status_largeornegativedelta; + status = janus_bwe_twcc_status_largeornegativedelta; } else { /* Small */ - status = janus_rtp_packet_status_smalldelta; + status = janus_bwe_twcc_status_smalldelta; } /* Store delta */ /* Overflows are possible here */ @@ -1685,7 +1675,7 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr } /* Check if all previoues ones were equal and this one the first different */ - if (all_same && last_status!=janus_rtp_packet_status_reserved && status!=last_status) { + if (all_same && last_status != janus_bwe_twcc_status_reserved && status != last_status) { /* How big was the same run */ if (g_queue_get_length(statuses)>7) { guint32 word = 0; @@ -1707,8 +1697,8 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr /* Remove all statuses */ g_queue_clear(statuses); /* Reset status */ - last_status = janus_rtp_packet_status_reserved; - max_status = janus_rtp_packet_status_notreceived; + last_status = janus_bwe_twcc_status_reserved; + max_status = janus_bwe_twcc_status_notreceived; all_same = TRUE; } else { /* Not same */ @@ -1730,7 +1720,7 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr /* Check if we can still be enqueuing for a run */ if (!all_same) { /* Check */ - if (!all_same && max_status==janus_rtp_packet_status_largeornegativedelta && g_queue_get_length(statuses)>6) { + if (!all_same && max_status == janus_bwe_twcc_status_largeornegativedelta && g_queue_get_length(statuses)>6) { guint32 word = 0; /* 0 1 @@ -1747,7 +1737,7 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr size_t i = 0; for (i=0;i<7;++i) { /* Get status */ - janus_rtp_packet_status status = (janus_rtp_packet_status) GPOINTER_TO_UINT(g_queue_pop_head (statuses)); + janus_bwe_twcc_status status = (janus_bwe_twcc_status) GPOINTER_TO_UINT(g_queue_pop_head (statuses)); /* Write */ word = janus_push_bits(word, 2, (guint8)status); } @@ -1755,21 +1745,21 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr janus_set2(data, len, word); len += 2; /* Reset */ - last_status = janus_rtp_packet_status_reserved; - max_status = janus_rtp_packet_status_notreceived; + last_status = janus_bwe_twcc_status_reserved; + max_status = janus_bwe_twcc_status_notreceived; all_same = TRUE; /* We need to restore the values, as there may be more elements on the buffer */ for (i=0; imax_status) { /* Store it */ max_status = status; } //Check if it is the same */ - if (all_same && last_status!=janus_rtp_packet_status_reserved && status!=last_status) { + if (all_same && last_status != janus_bwe_twcc_status_reserved && status!=last_status) { /* Not the same */ all_same = FALSE; } @@ -1793,7 +1783,7 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr guint32 i = 0; for (i=0;i<14;++i) { /* Get status */ - janus_rtp_packet_status status = (janus_rtp_packet_status) GPOINTER_TO_UINT(g_queue_pop_head (statuses)); + janus_bwe_twcc_status status = (janus_bwe_twcc_status) GPOINTER_TO_UINT(g_queue_pop_head (statuses)); /* Write */ word = janus_push_bits(word, 1, (guint8)status); } @@ -1801,8 +1791,8 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr janus_set2(data, len, word); len += 2; /* Reset */ - last_status = janus_rtp_packet_status_reserved; - max_status = janus_rtp_packet_status_notreceived; + last_status = janus_bwe_twcc_status_reserved; + max_status = janus_bwe_twcc_status_notreceived; all_same = TRUE; } } @@ -1828,7 +1818,7 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr /* Write word */ janus_set2(data, len, word); len += 2; - } else if (max_status == janus_rtp_packet_status_largeornegativedelta) { + } else if (max_status == janus_bwe_twcc_status_largeornegativedelta) { guint32 word = 0; /* Write chunk */ word = janus_push_bits(word, 1, 1); @@ -1837,7 +1827,7 @@ int janus_rtcp_transport_wide_cc_feedback(char *packet, size_t size, guint32 ssr unsigned int i = 0; for (i=0;i #include +#include "bwe.h" + /*! \brief RTCP Packet Types (http://www.networksorcery.com/enp/protocol/rtcp.htm) */ typedef enum { RTCP_FIR = 192, @@ -287,14 +289,12 @@ typedef struct rtcp_context typedef rtcp_context janus_rtcp_context; /*! \brief Stores transport wide packet reception statistics */ -typedef struct rtcp_transport_wide_cc_stats -{ +typedef struct janus_rtcp_transport_wide_cc_stats { /*! \brief Transwport wide sequence number */ guint32 transport_seq_num; /*! \brief Reception time */ guint64 timestamp; -} rtcp_transport_wide_cc_stats; -typedef rtcp_transport_wide_cc_stats janus_rtcp_transport_wide_cc_stats; +} janus_rtcp_transport_wide_cc_stats; /*! \brief Method to retrieve the estimated round-trip time from an existing RTCP context * @param[in] ctx The RTCP context to query @@ -379,10 +379,11 @@ gboolean janus_is_rtcp(char *buf, guint len); /*! \brief Method to parse/validate an RTCP message * @param[in] ctx RTCP context to update, if needed (optional) + * @param[in] bwe Bandwidth estimation context to update, if needed (optional) * @param[in] packet The message data * @param[in] len The message data length in bytes * @returns 0 in case of success, -1 on errors */ -int janus_rtcp_parse(janus_rtcp_context *ctx, char *packet, int len); +int janus_rtcp_parse(janus_rtcp_context *ctx, janus_bwe_context *bwe, char *packet, int len); /*! \brief Method to fix incoming RTCP SR and RR data * @param[in] packet The message data @@ -398,13 +399,14 @@ int janus_rtcp_fix_report_data(char *packet, int len, uint32_t base_ts, uint32_t /*! \brief Method to fix an RTCP message (http://tools.ietf.org/html/draft-ietf-straw-b2bua-rtcp-00) * @param[in] ctx RTCP context to update, if needed (optional) + * @param[in] bwe Bandwidth estimation context to update, if needed (optional) * @param[in] packet The message data * @param[in] len The message data length in bytes * @param[in] fixssrc Whether the method needs to fix the message or just parse it * @param[in] newssrcl The SSRC of the sender to put in the message * @param[in] newssrcr The SSRC of the receiver to put in the message * @returns 0 in case of success, -1 on errors */ -int janus_rtcp_fix_ssrc(janus_rtcp_context *ctx, char *packet, int len, int fixssrc, uint32_t newssrcl, uint32_t newssrcr); +int janus_rtcp_fix_ssrc(janus_rtcp_context *ctx, janus_bwe_context *bwe, char *packet, int len, int fixssrc, uint32_t newssrcl, uint32_t newssrcr); /*! \brief Method to filter an outgoing RTCP message (http://tools.ietf.org/html/draft-ietf-straw-b2bua-rtcp-00) * @param[in] packet The message data diff --git a/src/rtp.c b/src/rtp.c index 066159d741..2db236f820 100644 --- a/src/rtp.c +++ b/src/rtp.c @@ -1065,7 +1065,9 @@ void janus_rtp_simulcasting_context_reset(janus_rtp_simulcasting_context *contex context->rid_ext_id = -1; context->substream = -1; context->substream_target_temp = -1; + context->substream_target_bwe = -1; context->templayer = -1; + context->templayer_target_bwe = -1; } void janus_rtp_simulcasting_prepare(json_t *simulcast, int *rid_ext_id, uint32_t *ssrcs, char **rids) { @@ -1123,6 +1125,8 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte janus_videocodec vcodec, janus_rtp_switching_context *sc, janus_mutex *rid_mutex) { if(!context || !buf || len < 1) return FALSE; + context->substream_last = -1; + context->temporal_last = -1; janus_rtp_header *header = (janus_rtp_header *)buf; uint32_t ssrc = ntohl(header->ssrc); int substream = -1; @@ -1161,6 +1165,7 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte return FALSE; } } + context->substream_last = substream; /* Reset the flags */ context->changed_substream = FALSE; context->changed_temporal = FALSE; @@ -1180,7 +1185,10 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte context->substream_target_temp = -1; } int target = (context->substream_target_temp == -1) ? context->substream_target : context->substream_target_temp; + if(context->substream_target_bwe != -1 && context->substream_target_bwe < target) + target = context->substream_target_bwe; /* Check what we need to do with the packet */ + gboolean relay = TRUE; if(context->substream == -1) { if((vcodec == JANUS_VIDEOCODEC_VP8 && janus_vp8_is_keyframe(payload, plen)) || (vcodec == JANUS_VIDEOCODEC_VP9 && janus_vp9_is_keyframe(payload, plen)) || @@ -1193,7 +1201,7 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte context->last_relayed = now; } else { /* Don't relay anything until we get a keyframe */ - return FALSE; + relay = FALSE; } } else if(context->substream != target) { /* We're not on the substream we'd like: let's wait for a keyframe on the target */ @@ -1210,21 +1218,30 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte /* Notify the caller that the substream changed */ context->changed_substream = TRUE; context->last_relayed = now; + } else if(context->substream_target_bwe != -1 && context->substream > target && substream < context->substream) { + /* We need to go down because of BWE, don't wait for a keyframe (hopefully one will follow) */ + context->substream = substream; + /* Notify the caller that the substream changed */ + context->changed_substream = TRUE; + context->last_relayed = now; + //~ } else if(context->substream_target_bwe != -1 && context->substream > target) { + //~ /* We need to go down because of BWE but don't have a keyframe yet, don't relay anything */ + //~ relay = FALSE; } } /* If we haven't received our desired substream yet, let's drop temporarily */ - if(context->last_relayed == 0) { + if(relay && context->last_relayed == 0) { /* Let's start slow */ context->last_relayed = now; - } else if(context->substream > 0) { + } else if(relay && context->substream > 0) { /* Check if too much time went by with no packet relayed */ if((now - context->last_relayed) > (context->drop_trigger ? context->drop_trigger : 250000)) { context->last_relayed = now; if(context->substream != substream && context->substream_target_temp != 0) { - if(context->substream_target > substream) { + if(target > substream) { int prev_target = context->substream_target_temp; if(context->substream_target_temp == -1) - context->substream_target_temp = context->substream_target - 1; + context->substream_target_temp = target - 1; else context->substream_target_temp--; if(context->substream_target_temp < 0) @@ -1242,13 +1259,19 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte /* Do we need to drop this? */ if(context->substream < 0) return FALSE; - if(substream != context->substream) { + if(relay && substream != context->substream) { JANUS_LOG(LOG_HUGE, "Dropping packet (it's from SSRC %"SCNu32", but we're only relaying SSRC %"SCNu32" now\n", ssrc, *(ssrcs + context->substream)); - return FALSE; + relay = FALSE; } - context->last_relayed = janus_get_monotonic_time(); + if(relay) + context->last_relayed = janus_get_monotonic_time(); /* Temporal layers are only easily available for some codecs */ + int substream_target = target; + target = context->templayer_target; + if(context->templayer_target_bwe != -1 && context->templayer_target_bwe < target && + substream_target <= context->substream) + target = context->templayer_target_bwe; if(vcodec == JANUS_VIDEOCODEC_VP8) { /* Check if there's any temporal scalability to take into account */ gboolean m = FALSE; @@ -1259,9 +1282,14 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte uint8_t keyidx = 0; if(janus_vp8_parse_descriptor(payload, plen, &m, &picid, &tlzi, &tid, &ybit, &keyidx) == 0) { //~ JANUS_LOG(LOG_WARN, "%"SCNu16", %u, %u, %u, %u\n", picid, tlzi, tid, ybit, keyidx); - if(context->templayer != context->templayer_target && tid == context->templayer_target) { + context->temporal_last = tid; + if(!relay) + return FALSE; + if(context->templayer != target && (tid == target || + (target > tid && context->templayer < tid) || + (target < tid && context->templayer > tid))) { /* FIXME We should be smarter in deciding when to switch */ - context->templayer = context->templayer_target; + context->templayer = tid; /* Notify the caller that the temporal layer changed */ context->changed_temporal = TRUE; } @@ -1279,20 +1307,23 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte gboolean found = FALSE; janus_vp9_svc_info svc_info = { 0 }; if(janus_vp9_parse_svc(payload, plen, &found, &svc_info) == 0 && found) { + context->temporal_last = svc_info.temporal_layer; + if(!relay) + return FALSE; int temporal_layer = context->templayer; - if(context->templayer_target > context->templayer) { + if(target > context->templayer) { /* We need to upscale */ if(svc_info.ubit && svc_info.bbit && svc_info.temporal_layer > context->templayer && - svc_info.temporal_layer <= context->templayer_target) { + svc_info.temporal_layer <= target) { context->templayer = svc_info.temporal_layer; temporal_layer = context->templayer; context->changed_temporal = TRUE; } - } else if(context->templayer_target < context->templayer) { + } else if(target < context->templayer) { /* We need to downscale */ - if(svc_info.ebit && svc_info.temporal_layer == context->templayer_target) { - context->templayer = context->templayer_target; + if(svc_info.ebit && svc_info.temporal_layer == target) { + context->templayer = target; context->changed_temporal = TRUE; } } @@ -1315,18 +1346,21 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte if(janus_av1_svc_context_process_dd(av1ctx, dd_content, dd_len, &template, NULL)) { janus_av1_svc_template *t = g_hash_table_lookup(av1ctx->templates, GUINT_TO_POINTER(template)); if(t) { + context->temporal_last = t->temporal; + if(!relay) + return FALSE; int temporal_layer = context->templayer; - if(context->templayer_target > context->templayer) { + if(target > context->templayer) { /* We need to upscale */ - if(t->temporal > context->templayer && t->temporal <= context->templayer_target) { + if(t->temporal > context->templayer && t->temporal <= target) { context->templayer = t->temporal; temporal_layer = context->templayer; context->changed_temporal = TRUE; } - } else if(context->templayer_target < context->templayer) { + } else if(target < context->templayer) { /* We need to downscale */ - if(t->temporal == context->templayer_target) { - context->templayer = context->templayer_target; + if(t->temporal == target) { + context->templayer = target; context->changed_temporal = TRUE; } } @@ -1343,7 +1377,7 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte } } /* If we got here, the packet can be relayed */ - return TRUE; + return relay; } /* VP9 SVC */ diff --git a/src/rtp.h b/src/rtp.h index 17b28829aa..8348bdd593 100644 --- a/src/rtp.h +++ b/src/rtp.h @@ -390,11 +390,15 @@ typedef struct janus_rtp_simulcasting_context { /*! \brief Which simulcast substream we should forward back */ int substream; /*! \brief As above, but to handle transitions (e.g., wait for keyframe, or get this if available) */ - int substream_target, substream_target_temp; + int substream_target, substream_target_temp, substream_target_bwe; + /*! \brief Last substream that was identified by janus_rtp_simulcasting_context_process_rtp */ + int substream_last; /*! \brief Which simulcast temporal layer we should forward back */ int templayer; /*! \brief As above, but to handle transitions (e.g., wait for keyframe) */ - int templayer_target; + int templayer_target, templayer_target_bwe; + /*! \brief Last temporal layer that was identified by janus_rtp_simulcasting_context_process_rtp */ + int temporal_last; /*! \brief How much time (in us, default 250000) without receiving packets will make us drop to the substream below */ guint32 drop_trigger; /*! \brief When we relayed the last packet (used to detect when substreams become unavailable) */ diff --git a/src/sdp.c b/src/sdp.c index 7e81ed804f..23059533e8 100644 --- a/src/sdp.c +++ b/src/sdp.c @@ -575,6 +575,9 @@ int janus_sdp_process_remote(void *ice_handle, janus_sdp *remote_sdp, gboolean r if(a->value && strstr(a->value, "nack") && medium) { /* Enable NACKs */ medium->do_nacks = TRUE; + } else if(a->value && strstr(a->value, "transport-cc") && medium) { + /* Enable TWCC */ + medium->do_twcc = TRUE; } } else if(!strcasecmp(a->name, "fmtp")) { if(a->value && strstr(a->value, "apt=")) {