diff --git a/conf/config.go b/conf/config.go index 41760ba..489abde 100644 --- a/conf/config.go +++ b/conf/config.go @@ -65,7 +65,6 @@ type HlsConfig struct { type GB28181Config struct { Enable bool `json:"enable"` // gb28181使能标志 ListenAddr string `json:"listen_addr"` // gb28181监听地址 - SipNetwork string `json:"sip_network"` // 传输协议,默认UDP,可选TCP SipIP string `json:"sip_ip"` // sip 服务器公网IP SipPort uint16 `json:"sip_port"` // sip 服务器端口,默认 5060 Serial string `json:"serial"` // sip 服务器 id, 默认 34020000002000000001 diff --git a/conf/lalmax.conf.json b/conf/lalmax.conf.json index 9d39540..61aedbf 100644 --- a/conf/lalmax.conf.json +++ b/conf/lalmax.conf.json @@ -36,7 +36,6 @@ "realm": "3402000000", "sip_ip": "192.168.254.165", "sip_port": 5060, - "sip_network": "udp", "username": "", "media_config": { "media_ip": "192.168.254.165" diff --git a/gb28181/channel.go b/gb28181/channel.go index 26de32e..f606fc4 100644 --- a/gb28181/channel.go +++ b/gb28181/channel.go @@ -207,11 +207,9 @@ func (channel *Channel) Invite(opt *InviteOptions, streamName string, playInfo * nazalog.Error("invite failed, err:", err, " invite msg:", invite.String()) //jay 在media端口监听成功后,但是sip发送失败时 - if !playInfo.SinglePort { - if channel.observer != nil { - if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil { - nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error()) - } + if channel.observer != nil { + if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId, ""); err != nil { + nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error()) } } @@ -249,15 +247,15 @@ func (channel *Channel) Invite(opt *InviteOptions, streamName string, playInfo * channel.ackReq = ackReq channel.playInfo = playInfo - err = sipsvr.Send(ackReq) + err = channel.device.sipSvr.Send(ackReq) } else { - if !playInfo.SinglePort { - if channel.observer != nil { - if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil { - nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error()) - } + + if channel.observer != nil { + if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId, ""); err != nil { + nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error()) } } + } return } @@ -271,11 +269,9 @@ func (channel *Channel) GetCallId() string { } func (channel *Channel) stopMediaServer() (err error) { if channel.playInfo != nil { - if !channel.playInfo.SinglePort { - if channel.observer != nil { - if err = channel.observer.OnStopMediaServer(channel.playInfo.NetWork, channel.playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil { - nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error()) - } + if channel.observer != nil { + if err = channel.observer.OnStopMediaServer(channel.playInfo.NetWork, channel.playInfo.SinglePort, channel.device.ID, channel.ChannelId, channel.playInfo.StreamName); err != nil { + nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error()) } } } @@ -288,21 +284,18 @@ func (channel *Channel) byeClear() (err error) { return } func (channel *Channel) Bye(streamName string) (err error) { - if channel.ackReq != nil { byeReq := channel.ackReq channel.ackReq = nil byeReq.SetMethod(sip.BYE) seq, _ := byeReq.CSeq() seq.SeqNo += 1 - sipsvr.Send(byeReq) + channel.device.sipSvr.Send(byeReq) } else { err = errors.New("channel has been closed") } - channel.stopMediaServer() return err - } func (channel *Channel) CreateRequst(Method sip.RequestMethod, conf config.GB28181Config) (req sip.Request) { d := channel.device @@ -357,7 +350,7 @@ func (channel *Channel) CreateRequst(Method sip.RequestMethod, conf config.GB281 nil, ) - req.SetTransport(conf.SipNetwork) + req.SetTransport(channel.device.network) req.SetDestination(d.NetAddr) return req } diff --git a/gb28181/device.go b/gb28181/device.go index 2b5e5ca..cd29e27 100644 --- a/gb28181/device.go +++ b/gb28181/device.go @@ -2,6 +2,7 @@ package gb28181 import ( "context" + "github.com/ghettovoice/gosip" "net/http" "strings" "sync" @@ -58,12 +59,20 @@ type Device struct { observer IMediaOpObserver conf config.GB28181Config + + network string + sipSvr gosip.Server } func (d *Device) WithMediaServer(observer IMediaOpObserver) { d.observer = observer } +func (d *Device) WithSipSvr(sipSvr gosip.Server) *Device { + d.sipSvr = sipSvr + return d +} + func (d *Device) syncChannels() { if time.Since(d.lastSyncTime) > 2*time.Second { d.lastSyncTime = time.Now() @@ -171,7 +180,7 @@ func (d *Device) CreateRequest(Method sip.RequestMethod, conf config.GB28181Conf nil, ) - req.SetTransport(conf.SipNetwork) + req.SetTransport(d.network) req.SetDestination(d.NetAddr) return } @@ -330,5 +339,5 @@ func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng str } func (d *Device) SipRequestForResponse(request sip.Request) (sip.Response, error) { - return sipsvr.RequestWithContext(context.Background(), request) + return d.sipSvr.RequestWithContext(context.Background(), request) } diff --git a/gb28181/mediaserver/conn.go b/gb28181/mediaserver/conn.go index c359c42..85d91bf 100644 --- a/gb28181/mediaserver/conn.go +++ b/gb28181/mediaserver/conn.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net" + "sync" "time" "github.com/q191201771/lalmax/gb28181/mpegps" @@ -49,6 +50,10 @@ type Conn struct { buffer *bytes.Buffer key string + + mediaServer *GB28181MediaServer + one sync.Once + oneSaveConn sync.Once } func NewConn(conn net.Conn, observer IGbObserver, lal logic.ILalServer) *Conn { @@ -65,13 +70,16 @@ func NewConn(conn net.Conn, observer IGbObserver, lal logic.ILalServer) *Conn { return c } +func (c *Conn) SetMediaServer(mediaServer *GB28181MediaServer) { + c.mediaServer = mediaServer +} func (c *Conn) SetKey(key string) { c.key = key } func (c *Conn) Serve() (err error) { defer func() { nazalog.Info("conn close, err:", err) - c.conn.Close() + c.Close() if c.observer != nil { c.observer.NotifyClose(c.streamName) @@ -85,7 +93,7 @@ func (c *Conn) Serve() (err error) { nazalog.Info("gb28181 conn, remoteaddr:", c.conn.RemoteAddr().String(), " localaddr:", c.conn.LocalAddr().String()) for { - c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) + c.conn.SetReadDeadline(time.Now().Add(10 * time.Second)) pkt := &rtp.Packet{} if c.conn.RemoteAddr().Network() == "udp" { buf := make([]byte, 1472*4) @@ -137,6 +145,11 @@ func (c *Conn) Serve() (err error) { } c.check = true c.streamName = mediaInfo.StreamName + c.oneSaveConn.Do(func() { + if c.mediaServer != nil { + c.mediaServer.conns.Store(c.streamName, c) + } + }) if len(mediaInfo.DumpFileName) > 0 { c.psDumpFile = base.NewDumpFile() if err = c.psDumpFile.OpenToWrite(mediaInfo.DumpFileName); err != nil { @@ -253,7 +266,11 @@ func (c *Conn) OnFrame(frame []byte, cid mpegps.PsStreamType, pts uint64, dts ui c.lalSession.FeedAvPacket(pkt) } } - +func (c *Conn) Close() { + c.one.Do(func() { + c.conn.Close() + }) +} func getPayloadType(cid mpegps.PsStreamType) base.AvPacketPt { switch cid { case mpegps.PsStreamAac: diff --git a/gb28181/mediaserver/server.go b/gb28181/mediaserver/server.go index 1320e8b..c40258c 100644 --- a/gb28181/mediaserver/server.go +++ b/gb28181/mediaserver/server.go @@ -26,7 +26,7 @@ type GB28181MediaServer struct { observer IGbObserver mediaKey string - conn *Conn //增加链接对象,目前只适用于多端口 + conns sync.Map //增加链接对象,目前只适用于多端口 } func NewGB28181MediaServer(listenPort int, mediaKey string, observer IGbObserver, lal logic.ILalServer) *GB28181MediaServer { @@ -61,21 +61,29 @@ func (s *GB28181MediaServer) Start(listener net.Listener) (err error) { c := NewConn(conn, s.observer, s.lalServer) c.SetKey(s.mediaKey) - - s.conn = c - go c.Serve() + c.SetMediaServer(s) + go func() { + c.Serve() + s.conns.Delete(c.streamName) + }() } }() } return } +func (s *GB28181MediaServer) CloseConn(streamName string) { + if v, ok := s.conns.Load(streamName); ok { + conn := v.(*Conn) + conn.Close() + } +} func (s *GB28181MediaServer) Dispose() { s.disposeOnce.Do(func() { - - if s.conn != nil { - s.conn.conn.Close() - } - + s.conns.Range(func(_, value any) bool { + conn := value.(*Conn) + conn.Close() + return true + }) if s.listener != nil { s.listener.Close() s.listener = nil diff --git a/gb28181/server.go b/gb28181/server.go index cfc4242..9d93f60 100644 --- a/gb28181/server.go +++ b/gb28181/server.go @@ -4,11 +4,10 @@ import ( "bytes" "encoding/xml" "fmt" - "io/ioutil" - "math/rand" "net" "net/http" "strconv" + "strings" "sync" "time" @@ -22,13 +21,11 @@ import ( "github.com/q191201771/lal/pkg/logic" "github.com/q191201771/naza/pkg/nazalog" "golang.org/x/net/html/charset" - "golang.org/x/text/encoding/simplifiedchinese" - "golang.org/x/text/transform" ) type IMediaOpObserver interface { OnStartMediaServer(netWork string, singlePort bool, deviceId string, channelId string) *mediaserver.GB28181MediaServer - OnStopMediaServer(netWork string, singlePort bool, deviceId string, channelId string) error + OnStopMediaServer(netWork string, singlePort bool, deviceId string, channelId string, StreamName string) error } type GB28181Server struct { conf config.GB28181Config @@ -42,6 +39,9 @@ type GB28181Server struct { udpAvailConnPool *AvailConnPool tcpAvailConnPool *AvailConnPool + sipUdpSvr gosip.Server + sipTcpSvr gosip.Server + MediaServerMap sync.Map disposeOnce sync.Once } @@ -61,11 +61,6 @@ func NewGB28181Server(conf config.GB28181Config, lal logic.ILalServer) *GB28181S if conf.ListenAddr == "" { conf.ListenAddr = "0.0.0.0" } - - if conf.SipNetwork == "" { - conf.SipNetwork = "udp" - } - if conf.SipPort == 0 { conf.SipPort = 5060 } @@ -117,26 +112,30 @@ func NewGB28181Server(conf config.GB28181Config, lal logic.ILalServer) *GB28181S } func (s *GB28181Server) Start() { + s.sipUdpSvr = s.newSipServer("udp") + s.sipTcpSvr = s.newSipServer("tcp") + go s.startJob() +} +func (s *GB28181Server) newSipServer(network string) gosip.Server { srvConf := gosip.ServerConfig{} if s.conf.SipIP != "" { srvConf.Host = s.conf.SipIP } - sipsvr = gosip.NewServer(srvConf, nil, nil, logger) - sipsvr.OnRequest(sip.REGISTER, s.OnRegister) - sipsvr.OnRequest(sip.MESSAGE, s.OnMessage) - sipsvr.OnRequest(sip.NOTIFY, s.OnNotify) - sipsvr.OnRequest(sip.BYE, s.OnBye) + sipSvr := gosip.NewServer(srvConf, nil, nil, logger) + sipSvr.OnRequest(sip.REGISTER, s.OnRegister) + sipSvr.OnRequest(sip.MESSAGE, s.OnMessage) + sipSvr.OnRequest(sip.NOTIFY, s.OnNotify) + sipSvr.OnRequest(sip.BYE, s.OnBye) addr := s.conf.ListenAddr + ":" + strconv.Itoa(int(s.conf.SipPort)) - err := sipsvr.Listen(s.conf.SipNetwork, addr) + err := sipSvr.Listen(network, addr) if err != nil { nazalog.Fatal(err) } - nazalog.Info("gb28181 sip listen success, network:", s.conf.SipNetwork, " listen addr:", addr) - - go s.startJob() + nazalog.Info(" start sip server listen. addr= " + addr + " network:" + network) + return sipSvr } func (s *GB28181Server) Dispose() { s.disposeOnce.Do( @@ -146,6 +145,8 @@ func (s *GB28181Server) Dispose() { mediaServer.Dispose() return true }) + s.sipTcpSvr.Shutdown() + s.sipUdpSvr.Shutdown() }) } func (s *GB28181Server) OnStartMediaServer(netWork string, singlePort bool, deviceId string, channelId string) *mediaserver.GB28181MediaServer { @@ -218,7 +219,7 @@ func (s *GB28181Server) OnStartMediaServer(netWork string, singlePort bool, devi } return mediasvr } -func (s *GB28181Server) OnStopMediaServer(netWork string, singlePort bool, deviceId string, channelId string) error { +func (s *GB28181Server) OnStopMediaServer(netWork string, singlePort bool, deviceId string, channelId string, StreamName string) error { isTcpFlag := false if netWork == "tcp" { isTcpFlag = true @@ -249,8 +250,11 @@ func (s *GB28181Server) OnStopMediaServer(netWork string, singlePort bool, devic } } if mediasvr != nil { - mediasvr.Dispose() - + if singlePort { + mediasvr.CloseConn(StreamName) + } else { + mediasvr.Dispose() + } } return nil } @@ -719,6 +723,12 @@ func (s *GB28181Server) StoreDevice(id string, req sip.Request) (d *Device) { d.UpdateTime = time.Now() d.NetAddr = deviceIp d.addr = deviceAddr + d.network = strings.ToLower(req.Transport()) + if d.network == "udp" { + d.sipSvr = s.sipUdpSvr + } else { + d.sipSvr = s.sipTcpSvr + } nazalog.Info("UpdateDevice, netaddr:", d.NetAddr) } else { servIp := req.Recipient().Host() @@ -735,6 +745,12 @@ func (s *GB28181Server) StoreDevice(id string, req sip.Request) (d *Device) { mediaIP: mediaIp, NetAddr: deviceIp, conf: s.conf, + network: strings.ToLower(req.Transport()), + } + if d.network == "udp" { + d.sipSvr = s.sipUdpSvr + } else { + d.sipSvr = s.sipTcpSvr } d.WithMediaServer(s) nazalog.Info("StoreDevice, deviceIp:", deviceIp, " serverIp:", servIp, " mediaIp:", mediaIp, " sipIP:", sipIp) @@ -758,64 +774,17 @@ func (s *GB28181Server) RecoverDevice(d *Device, req sip.Request) { d.sipIP = sipIp d.mediaIP = mediaIp d.NetAddr = deviceIp + d.network = strings.ToLower(req.Transport()) + if d.network == "udp" { + d.sipSvr = s.sipUdpSvr + } else { + d.sipSvr = s.sipTcpSvr + } d.UpdateTime = time.Now() nazalog.Info("RecoverDevice, deviceIp:", deviceIp, " serverIp:", servIp, " mediaIp:", mediaIp, " sipIP:", sipIp) } -func RandNumString(n int) string { - numbers := "0123456789" - return randStringBySoure(numbers, n) -} - -func RandString(n int) string { - letterBytes := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - return randStringBySoure(letterBytes, n) -} - -// https://github.com/kpbird/golang_random_string -func randStringBySoure(src string, n int) string { - randomness := make([]byte, n) - - rand.Seed(time.Now().UnixNano()) - _, err := rand.Read(randomness) - if err != nil { - panic(err) - } - - l := len(src) - - // fill output - output := make([]byte, n) - for pos := range output { - random := randomness[pos] - randomPos := random % uint8(l) - output[pos] = src[randomPos] - } - - return string(output) -} - -func DecodeGbk(v interface{}, body []byte) error { - bodyBytes, err := GbkToUtf8(body) - if err != nil { - return err - } - decoder := xml.NewDecoder(bytes.NewReader(bodyBytes)) - decoder.CharsetReader = charset.NewReaderLabel - err = decoder.Decode(v) - return err -} - -func GbkToUtf8(s []byte) ([]byte, error) { - reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewDecoder()) - d, e := ioutil.ReadAll(reader) - if e != nil { - return s, e - } - return d, nil -} - type notifyMessage struct { ChannelInfo diff --git a/gb28181/util.go b/gb28181/util.go new file mode 100644 index 0000000..c5f1f00 --- /dev/null +++ b/gb28181/util.go @@ -0,0 +1,65 @@ +package gb28181 + +import ( + "bytes" + "encoding/xml" + "golang.org/x/net/html/charset" + "golang.org/x/text/encoding/simplifiedchinese" + "golang.org/x/text/transform" + "io/ioutil" + "math/rand" + "time" +) + +func RandNumString(n int) string { + numbers := "0123456789" + return randStringBySoure(numbers, n) +} + +func RandString(n int) string { + letterBytes := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + return randStringBySoure(letterBytes, n) +} + +// https://github.com/kpbird/golang_random_string +func randStringBySoure(src string, n int) string { + randomness := make([]byte, n) + + rand.Seed(time.Now().UnixNano()) + _, err := rand.Read(randomness) + if err != nil { + panic(err) + } + + l := len(src) + + // fill output + output := make([]byte, n) + for pos := range output { + random := randomness[pos] + randomPos := random % uint8(l) + output[pos] = src[randomPos] + } + + return string(output) +} + +func DecodeGbk(v interface{}, body []byte) error { + bodyBytes, err := GbkToUtf8(body) + if err != nil { + return err + } + decoder := xml.NewDecoder(bytes.NewReader(bodyBytes)) + decoder.CharsetReader = charset.NewReaderLabel + err = decoder.Decode(v) + return err +} + +func GbkToUtf8(s []byte) ([]byte, error) { + reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewDecoder()) + d, e := ioutil.ReadAll(reader) + if e != nil { + return s, e + } + return d, nil +}