Skip to content

Commit

Permalink
Add new participant mutex to VideoRoom (#3361)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero authored May 14, 2024
1 parent 9be6a74 commit 9b8b8d5
Showing 1 changed file with 42 additions and 4 deletions.
46 changes: 42 additions & 4 deletions src/plugins/janus_videoroom.c
Original file line number Diff line number Diff line change
Expand Up @@ -2149,6 +2149,7 @@ typedef struct janus_videoroom_publisher {
int udp_sock; /* The udp socket on which to forward rtp packets */
gboolean kicked; /* Whether this participant has been kicked */
gboolean e2ee; /* If media from this publisher is end-to-end encrypted */
janus_mutex mutex; /* Mutex to lock this instance */
volatile gint destroyed;
janus_refcount ref;
} janus_videoroom_publisher;
Expand Down Expand Up @@ -2497,6 +2498,7 @@ static void janus_videoroom_publisher_free(const janus_refcount *p_ref) {

janus_mutex_destroy(&p->subscribers_mutex);
janus_mutex_destroy(&p->rtp_forwarders_mutex);
janus_mutex_destroy(&p->mutex);

/* If this is a dummy publisher, get rid of the session too */
if(p->dummy && p->session)
Expand Down Expand Up @@ -2817,6 +2819,7 @@ static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashT
publisher->rtp_forwarders = g_hash_table_new(NULL, NULL);
publisher->udp_sock = -1;
g_atomic_int_set(&publisher->destroyed, 0);
janus_mutex_init(&publisher->mutex);
janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free);
/* Now we create a separate publisher stream for each supported codec in the room */
janus_videoroom_publisher_stream *ps = NULL;
Expand Down Expand Up @@ -4138,7 +4141,9 @@ static void janus_videoroom_leave_or_unpublish(janus_videoroom_publisher *partic
g_hash_table_remove(participant->room->participants,
string_ids ? (gpointer)participant->user_id_str : (gpointer)&participant->user_id);
g_hash_table_remove(participant->room->private_ids, GUINT_TO_POINTER(participant->pvt_id));
janus_mutex_lock(&participant->mutex);
g_clear_pointer(&participant->room, janus_videoroom_room_dereference);
janus_mutex_unlock(&participant->mutex);
}
janus_mutex_unlock(&room->mutex);
janus_refcount_decrease(&room->ref);
Expand Down Expand Up @@ -5195,7 +5200,9 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
while (g_hash_table_iter_next(&iter, NULL, &value)) {
janus_videoroom_publisher *p = value;
if(p && !g_atomic_int_get(&p->destroyed) && p->session && p->room) {
janus_mutex_lock(&p->mutex);
g_clear_pointer(&p->room, janus_videoroom_room_dereference);
janus_mutex_unlock(&p->mutex);
/* Notify the user we're going to destroy the room... */
int ret = gateway->push_event(p->session->handle, &janus_videoroom_plugin, NULL, destroyed, NULL);
JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret));
Expand Down Expand Up @@ -7407,6 +7414,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
publisher->rtp_forwarders = g_hash_table_new(NULL, NULL);
publisher->udp_sock = -1;
g_atomic_int_set(&publisher->destroyed, 0);
janus_mutex_init(&publisher->mutex);
janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free);
/* Create publisher streams for all the things that the remote publisher is sending */
janus_videoroom_publisher_stream *ps = NULL;
Expand Down Expand Up @@ -8120,11 +8128,19 @@ void janus_videoroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp
janus_videoroom_incoming_rtp_internal(session, participant, pkt);
}
static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *session, janus_videoroom_publisher *participant, janus_plugin_rtp *pkt) {
if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams || participant->room == NULL) {
if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams) {
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
janus_mutex_lock(&participant->mutex);
janus_videoroom *videoroom = participant->room;
if(videoroom == NULL) {
janus_mutex_unlock(&participant->mutex);
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
janus_refcount_increase_nodebug(&videoroom->ref);
janus_mutex_unlock(&participant->mutex);

/* Find the stream this packet belongs to */
janus_mutex_lock(&participant->streams_mutex);
Expand All @@ -8137,6 +8153,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi
if(ps != NULL)
janus_refcount_decrease_nodebug(&ps->ref);
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
return;
}

Expand Down Expand Up @@ -8281,6 +8298,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi
char *payload = janus_rtp_payload(buf, len, &plen);
if(payload == NULL) {
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
return;
}
if(ps->vcodec == JANUS_VIDEOCODEC_VP9) {
Expand Down Expand Up @@ -8351,6 +8369,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi
char *payload = janus_rtp_payload(buf, len, &plen);
if(payload == NULL) {
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
return;
}
if(ps->vcodec == JANUS_VIDEOCODEC_VP8) {
Expand Down Expand Up @@ -8378,6 +8397,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi
}
janus_refcount_decrease_nodebug(&ps->ref);
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
}

void janus_videoroom_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet) {
Expand Down Expand Up @@ -8450,12 +8470,22 @@ static void janus_videoroom_incoming_data_internal(janus_videoroom_session *sess
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams || participant->room == NULL) {
if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams) {
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
janus_mutex_lock(&participant->mutex);
janus_videoroom *videoroom = participant->room;
if(videoroom == NULL) {
janus_mutex_unlock(&participant->mutex);
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
janus_refcount_increase_nodebug(&videoroom->ref);
janus_mutex_unlock(&participant->mutex);
if(g_atomic_int_get(&participant->destroyed) || participant->data_mindex < 0 || !participant->streams || participant->kicked) {
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
return;
}
char *buf = packet->buffer;
Expand Down Expand Up @@ -8527,14 +8557,15 @@ static void janus_videoroom_incoming_data_internal(janus_videoroom_session *sess
pkt.is_rtp = FALSE;
pkt.textdata = !packet->binary;
janus_mutex_lock_nodebug(&ps->subscribers_mutex);
if(participant->room->helper_threads > 0) {
g_list_foreach(participant->room->threads, janus_videoroom_helper_rtpdata_packet, &pkt);
if(videoroom->helper_threads > 0) {
g_list_foreach(videoroom->threads, janus_videoroom_helper_rtpdata_packet, &pkt);
} else {
g_slist_foreach(ps->subscribers, janus_videoroom_relay_data_packet, &pkt);
}
janus_mutex_unlock_nodebug(&ps->subscribers_mutex);
janus_refcount_decrease_nodebug(&ps->ref);
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
}

void janus_videoroom_data_ready(janus_plugin_session *handle) {
Expand Down Expand Up @@ -8791,7 +8822,11 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) {
g_list_free_full(mappings, (GDestroyNotify)g_free);
}
/* Any subscriber session to update? */
janus_mutex_lock(&participant->mutex);
janus_videoroom *room = participant->room;
if(room)
janus_refcount_increase_nodebug(&room->ref);
janus_mutex_unlock(&participant->mutex);
if(subscribers != NULL) {
temp = subscribers;
while(temp) {
Expand Down Expand Up @@ -8855,6 +8890,8 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) {
janus_mutex_unlock(&participant->streams_mutex);
janus_videoroom_leave_or_unpublish(participant, FALSE, FALSE);
janus_refcount_decrease(&participant->ref);
if(room)
janus_refcount_decrease_nodebug(&room->ref);
} else if(session->participant_type == janus_videoroom_p_type_subscriber) {
/* Get rid of subscriber */
janus_videoroom_subscriber *subscriber = janus_videoroom_session_get_subscriber(session);
Expand Down Expand Up @@ -9189,6 +9226,7 @@ static void *janus_videoroom_handler(void *data) {
}
}
g_atomic_int_set(&publisher->destroyed, 0);
janus_mutex_init(&publisher->mutex);
janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free);
/* In case we also wanted to configure */
if(audiocodec && json_string_value(json_object_get(msg->jsep, "sdp")) != NULL) {
Expand Down

0 comments on commit 9b8b8d5

Please sign in to comment.