Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of "data" stream parameters in the streaming plugin #3412

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions src/plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ static struct janus_json_parameter rtp_media_parameters[] = {
{"port3", JANUS_JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"svc", JANUS_JSON_BOOL, 0},
/* Data only */
{"datatype", JANUS_JSON_STRING, 0},
{"buffermsg", JANUS_JSON_BOOL, 0},
};
static struct janus_json_parameter rtp_audio_parameters[] = {
Expand Down Expand Up @@ -2181,8 +2182,8 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
janus_config_item *vsps = janus_config_get(config, m, janus_config_type_item, "h264sps");
janus_config_item *vkf = janus_config_get(config, m, janus_config_type_item, "bufferkf");
janus_config_item *vsc = janus_config_get(config, m, janus_config_type_item, "simulcast");
janus_config_item *dbm = janus_config_get(config, cat, janus_config_type_item, "buffermsg");
janus_config_item *dt = janus_config_get(config, cat, janus_config_type_item, "datatype");
janus_config_item *dbm = janus_config_get(config, m, janus_config_type_item, "buffermsg");
janus_config_item *dt = janus_config_get(config, m, janus_config_type_item, "datatype");
janus_config_item *vport2 = janus_config_get(config, m, janus_config_type_item, "port2");
janus_config_item *vport3 = janus_config_get(config, m, janus_config_type_item, "port3");
janus_config_item *vsvc = janus_config_get(config, m, janus_config_type_item, "svc");
Expand Down Expand Up @@ -4682,7 +4683,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
if(stream->type == JANUS_STREAMING_MEDIA_DATA)
janus_config_add(config, m, janus_config_item_create("datatype", stream->textdata ? "text" : "binary"));
if(stream->buffermsg)
janus_config_add(config, m, janus_config_item_create("databuffermsg", "true"));
janus_config_add(config, m, janus_config_item_create("buffermsg", "true"));
temp = temp->next;
}
}
Expand Down Expand Up @@ -5587,15 +5588,6 @@ void janus_streaming_setup_media(janus_plugin_session *handle) {
}
janus_mutex_unlock(&stream->keyframe.mutex);
}
if(stream->buffermsg) {
JANUS_LOG(LOG_HUGE, "Any recent datachannel message to send? (%s)\n", stream->mid);
janus_mutex_lock(&stream->buffermsg_mutex);
if(stream->last_msg != NULL) {
JANUS_LOG(LOG_HUGE, "Yep!\n");
janus_streaming_relay_rtp_packet(session, stream->last_msg);
}
janus_mutex_unlock(&stream->buffermsg_mutex);
}
/* If this mountpoint has RTCP support, send a PLI */
if(stream->type == JANUS_STREAMING_MEDIA_VIDEO)
janus_streaming_rtcp_pli_send(stream);
Expand Down Expand Up @@ -5679,9 +5671,31 @@ void janus_streaming_data_ready(janus_plugin_session *handle) {
janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;
if(!session || g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup))
return;
janus_refcount_increase(&session->ref);
if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) {
JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_STREAMING_PACKAGE, handle);
/* Try to send a buffered datachannel message when datachannel is ready */
GList *temp = session->streams;
janus_streaming_session_stream *s;
janus_streaming_rtp_source_stream *stream;
while(temp) {
s = (janus_streaming_session_stream *)temp->data;
stream = s->stream;
if(stream->buffermsg) {
janus_refcount_increase(&stream->ref);
JANUS_LOG(LOG_VERB, "[%s-%p] Trying to send the most recent message (%s)\n", JANUS_STREAMING_PACKAGE, handle, stream->mid);
janus_mutex_lock(&stream->buffermsg_mutex);
if(stream->last_msg != NULL) {
JANUS_LOG(LOG_HUGE, "Buffered datachannel message found!\n");
janus_streaming_relay_rtp_packet(session, stream->last_msg);
}
janus_mutex_unlock(&stream->buffermsg_mutex);
janus_refcount_decrease(&stream->ref);
}
temp = temp->next;
}
}
janus_refcount_decrease(&session->ref);
}

void janus_streaming_hangup_media(janus_plugin_session *handle) {
Expand Down Expand Up @@ -9949,14 +9963,19 @@ static void *janus_streaming_relay_thread(void *data) {
/* Are we keeping track of the last message being relayed? */
if(stream->buffermsg) {
janus_mutex_lock(&stream->buffermsg_mutex);
if(stream->last_msg != NULL) {
janus_streaming_rtp_relay_packet_free((janus_streaming_rtp_relay_packet *)stream->last_msg);
stream->last_msg = NULL;
}
janus_streaming_rtp_relay_packet *pkt = g_malloc0(sizeof(janus_streaming_rtp_relay_packet));
pkt->data = g_malloc(bytes);
memcpy(pkt->data, data, bytes);
packet.mindex = stream->mindex;
packet.is_rtp = FALSE;
packet.is_data = TRUE;
packet.textdata = stream->textdata;
pkt->mindex = stream->mindex;
pkt->is_data = TRUE;
pkt->textdata = stream->textdata;
pkt->length = bytes;
/* Store the latest message */
stream->last_msg = pkt;
janus_mutex_unlock(&stream->buffermsg_mutex);
}
/* Go! */
Expand Down
Loading