From 92f0a7aafe2d9637d170d86c5ab13215176b4b4e Mon Sep 17 00:00:00 2001 From: joestarzxh Date: Thu, 10 Aug 2023 23:30:40 +0800 Subject: [PATCH] =?UTF-8?q?[fix]fmp4=E8=A7=A3=E5=86=B3=E6=92=AD=E6=94=BE?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E8=80=8C=E5=BC=95=E8=B5=B7=E7=9A=84=E5=8D=A1?= =?UTF-8?q?=E4=BD=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fmp4/http-fmp4/session.go | 40 ++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/fmp4/http-fmp4/session.go b/fmp4/http-fmp4/session.go index 2c71927..178e03a 100644 --- a/fmp4/http-fmp4/session.go +++ b/fmp4/http-fmp4/session.go @@ -21,6 +21,8 @@ import ( "github.com/yapingcat/gomedia/go-mp4" ) +var ErrWriteChanFull = errors.New("Fmp4 Session write channel full") + type fmp4WriterSeeker struct { buffer []byte offset int @@ -92,6 +94,7 @@ type HttpFmp4Session struct { fws *fmp4WriterSeeker initVideoDts uint32 initAudioDts uint32 + connCloseErr chan error } func NewHttpFmp4Session(streamid string) *HttpFmp4Session { @@ -100,8 +103,9 @@ func NewHttpFmp4Session(streamid string) *HttpFmp4Session { session := &HttpFmp4Session{ streamid: streamid, subscriberId: uuid.NewV4().String(), - avPacketChan: make(chan Frame, 100), + avPacketChan: make(chan Frame, 144), fws: newFmp4WriterSeeker(1024 * 1024), + connCloseErr: make(chan error, 1), } session.muxer, _ = mp4.CreateMp4Muxer(session.fws, mp4.WithMp4Flag(mp4.MP4_FLAG_FRAGMENT)) @@ -213,11 +217,10 @@ func (session *HttpFmp4Session) handleSession(c *gin.Context) { return } session.hooks.AddConsumer(session.subscriberId, session) - connCloseErr := make(chan error, 1) go func() { readBuf := make([]byte, 1024) if _, err = conn.Read(readBuf); err != nil { - connCloseErr <- err + session.connCloseErr <- err } }() @@ -270,7 +273,8 @@ func (session *HttpFmp4Session) handleSession(c *gin.Context) { session.muxer.ReBindWriter(fws) session.fws = fws } - case <-connCloseErr: + case err := <-session.connCloseErr: + nazalog.Errorf("fmp4 conn recv err:%s", err.Error()) session.OnStop() return } @@ -311,7 +315,7 @@ func (session *HttpFmp4Session) OnMsg(msg base.RtmpMsg) { } func (session *HttpFmp4Session) OnStop() { - session.hooks.RemoveConsumer(session.streamid) + session.hooks.RemoveConsumer(session.subscriberId) } func (session *HttpFmp4Session) VideoMsg2AvPacket(msg base.RtmpMsg) { @@ -397,8 +401,19 @@ func (session *HttpFmp4Session) VideoMsg2AvPacket(msg base.RtmpMsg) { } else { pkt.PayloadType = base.AvPacketPtHevc } + select { + case session.avPacketChan <- pkt: + default: + //session.avPacketChan 满了直接退出,防止阻塞 + if session.connCloseErr != nil { + select { + case session.connCloseErr <- ErrWriteChanFull: + default: + + } + } + } - session.avPacketChan <- pkt } } @@ -434,7 +449,18 @@ func (session *HttpFmp4Session) AudioMsg2AvPacket(msg base.RtmpMsg) { Cts: msg.Cts(), Payload: out, } + select { + case session.avPacketChan <- pkt: + default: + //session.avPacketChan 满了直接退出,防止阻塞 + if session.connCloseErr != nil { + select { + case session.connCloseErr <- ErrWriteChanFull: + default: + + } - session.avPacketChan <- pkt + } + } } }