Skip to content

Commit

Permalink
[fix]fmp4解决播放异常而引起的卡住
Browse files Browse the repository at this point in the history
  • Loading branch information
joestarzxh committed Aug 10, 2023
1 parent 4d72913 commit 92f0a7a
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions fmp4/http-fmp4/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/yapingcat/gomedia/go-mp4"
)

var ErrWriteChanFull = errors.New("Fmp4 Session write channel full")

type fmp4WriterSeeker struct {
buffer []byte
offset int
Expand Down Expand Up @@ -92,6 +94,7 @@ type HttpFmp4Session struct {
fws *fmp4WriterSeeker
initVideoDts uint32
initAudioDts uint32
connCloseErr chan error
}

func NewHttpFmp4Session(streamid string) *HttpFmp4Session {
Expand All @@ -100,8 +103,9 @@ func NewHttpFmp4Session(streamid string) *HttpFmp4Session {
session := &HttpFmp4Session{
streamid: streamid,
subscriberId: uuid.NewV4().String(),
avPacketChan: make(chan Frame, 100),
avPacketChan: make(chan Frame, 144),
fws: newFmp4WriterSeeker(1024 * 1024),
connCloseErr: make(chan error, 1),
}

session.muxer, _ = mp4.CreateMp4Muxer(session.fws, mp4.WithMp4Flag(mp4.MP4_FLAG_FRAGMENT))
Expand Down Expand Up @@ -213,11 +217,10 @@ func (session *HttpFmp4Session) handleSession(c *gin.Context) {
return
}
session.hooks.AddConsumer(session.subscriberId, session)
connCloseErr := make(chan error, 1)
go func() {
readBuf := make([]byte, 1024)
if _, err = conn.Read(readBuf); err != nil {
connCloseErr <- err
session.connCloseErr <- err
}
}()

Expand Down Expand Up @@ -270,7 +273,8 @@ func (session *HttpFmp4Session) handleSession(c *gin.Context) {
session.muxer.ReBindWriter(fws)
session.fws = fws
}
case <-connCloseErr:
case err := <-session.connCloseErr:
nazalog.Errorf("fmp4 conn recv err:%s", err.Error())
session.OnStop()
return
}
Expand Down Expand Up @@ -311,7 +315,7 @@ func (session *HttpFmp4Session) OnMsg(msg base.RtmpMsg) {
}

func (session *HttpFmp4Session) OnStop() {
session.hooks.RemoveConsumer(session.streamid)
session.hooks.RemoveConsumer(session.subscriberId)
}

func (session *HttpFmp4Session) VideoMsg2AvPacket(msg base.RtmpMsg) {
Expand Down Expand Up @@ -397,8 +401,19 @@ func (session *HttpFmp4Session) VideoMsg2AvPacket(msg base.RtmpMsg) {
} else {
pkt.PayloadType = base.AvPacketPtHevc
}
select {
case session.avPacketChan <- pkt:
default:
//session.avPacketChan 满了直接退出,防止阻塞
if session.connCloseErr != nil {
select {
case session.connCloseErr <- ErrWriteChanFull:
default:

}
}
}

session.avPacketChan <- pkt
}
}

Expand Down Expand Up @@ -434,7 +449,18 @@ func (session *HttpFmp4Session) AudioMsg2AvPacket(msg base.RtmpMsg) {
Cts: msg.Cts(),
Payload: out,
}
select {
case session.avPacketChan <- pkt:
default:
//session.avPacketChan 满了直接退出,防止阻塞
if session.connCloseErr != nil {
select {
case session.connCloseErr <- ErrWriteChanFull:
default:

}

session.avPacketChan <- pkt
}
}
}
}

0 comments on commit 92f0a7a

Please sign in to comment.