Skip to content

Commit

Permalink
Merge pull request #55 from OneHeng/gb28181-mediaserver
Browse files Browse the repository at this point in the history
gb28181 单独实现媒体服务
  • Loading branch information
ZSC714725 authored Mar 8, 2024
2 parents 3e2bd91 + b39cd05 commit 994c32e
Show file tree
Hide file tree
Showing 12 changed files with 423 additions and 185 deletions.
32 changes: 18 additions & 14 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,24 @@ type HlsConfig struct {
}

type GB28181Config struct {
Enable bool `json:"enable"` // gb28181使能标志
ListenAddr string `json:"listenAddr"` // gb28181监听地址
SipNetwork string `json:"sipNetwork"` // 传输协议,默认UDP,可选TCP
SipIP string `json:"sipIp"` // sip 服务器公网IP
SipPort uint16 `json:"sipPort"` // sip 服务器端口,默认 5060
Serial string `json:"serial"` // sip 服务器 id, 默认 34020000002000000001
Realm string `json:"realm"` // sip 服务器域,默认 3402000000
Username string `json:"username"` // sip 服务器账号
Password string `json:"password"` // sip 服务器密码
StreamIP string `json:"streamIp"` // 流媒体IP
ApiPort uint16 `json:"apiPort"` // 流媒体 Api 端口
ApiSsl bool `json:"apiSsl"` //流媒体 Api 是否ssl
KeepaliveInterval int `json:"keepaliveInterval"` //心跳包时长
QuickLogin bool `json:"quickLogin"` //快速登陆,有keepalive就认为在线
Enable bool `json:"enable"` // gb28181使能标志
ListenAddr string `json:"listenAddr"` // gb28181监听地址
SipNetwork string `json:"sipNetwork"` // 传输协议,默认UDP,可选TCP
SipIP string `json:"sipIp"` // sip 服务器公网IP
SipPort uint16 `json:"sipPort"` // sip 服务器端口,默认 5060
Serial string `json:"serial"` // sip 服务器 id, 默认 34020000002000000001
Realm string `json:"realm"` // sip 服务器域,默认 3402000000
Username string `json:"username"` // sip 服务器账号
Password string `json:"password"` // sip 服务器密码
KeepaliveInterval int `json:"keepaliveInterval"` // 心跳包时长
QuickLogin bool `json:"quickLogin"` // 快速登陆,有keepalive就认为在线
MediaConfig GB28181MediaConfig `json:"media_config"` // 媒体服务器配置
}

type GB28181MediaConfig struct {
MediaIp string `json:"mediaIp"` // 流媒体IP,用于在SDP中指定
TCPListenPort uint16 `json:"tcp_listen_port"` // tcp监听端口
UDPListenPort uint16 `json:"udp_listen_port"` // udp监听端口
}

type OnvifConfig struct {
Expand Down
12 changes: 8 additions & 4 deletions conf/lalmax.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
"enable": true,
"serial": "34020000002000000001",
"realm": "3402000000",
"sipIp": "100.100.100.100",
"sipPort": 5060,
"sipNetwork": "tcp",
"sipIp": "100.100.108.230",
"sipPort": 5060,
"username": "",
"streamIp": "100.100.100.100",
"quickLogin": true
"quickLogin": true,
"media_config": {
"mediaIp": "100.100.108.230",
"tcp_listen_port": 30000,
"udp_listen_port": 30000
}
},
"onvif_config": {
"enable": true
Expand Down
167 changes: 20 additions & 147 deletions gb28181/channel.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
package gb28181

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
config "lalmax/conf"
"net/http"
"strconv"
"strings"
"time"

"github.com/ghettovoice/gosip/sip"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
)

type Channel struct {
device *Device // 所属设备
//status atomic.Int32 // 通道状态,0:空闲,1:正在invite,2:正在播放
GpsTime time.Time // gps时间
number uint16
sessionId string
ackReq sip.Request
GpsTime time.Time // gps时间
number uint16
ackReq sip.Request
ChannelInfo
}

Expand Down Expand Up @@ -54,9 +49,9 @@ const (
ChannelOffStatus = "OFF"
)

func (channel *Channel) TryAutoInvite(opt *InviteOptions, conf config.GB28181Config, streamName string) {
func (channel *Channel) TryAutoInvite(opt *InviteOptions, conf config.GB28181Config, streamName, network string) {
if channel.CanInvite(streamName) {
go channel.Invite(opt, conf, streamName)
go channel.Invite(opt, conf, streamName, network)
}
}

Expand All @@ -69,12 +64,9 @@ func (channel *Channel) CanInvite(streamName string) bool {
return false
}
d := channel.device
//通过lal api是否有流信息
if d.DoLalGroupInfo(streamName) != nil {

if d.mediaInfo.IsInvite {
return false
} else {
channel.ackReq = nil
channel.sessionId = ""
}

// 11~13位是设备类型编码
Expand Down Expand Up @@ -135,7 +127,7 @@ f字段中视、音频参数段之间不需空格分割。
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
*/

func (channel *Channel) Invite(opt *InviteOptions, conf config.GB28181Config, streamName string) (code int, err error) {
func (channel *Channel) Invite(opt *InviteOptions, conf config.GB28181Config, streamName, network string) (code int, err error) {
d := channel.device
s := "Play"

Expand All @@ -147,21 +139,13 @@ func (channel *Channel) Invite(opt *InviteOptions, conf config.GB28181Config, st
opt.CreateSSRC(channel.ChannelId, channel.number)

protocol := ""
networkType := conf.SipNetwork
nazalog.Info("networkType:", network)

nazalog.Info("networkType:", networkType)

// 获取lal的端口
ctrlStartRtpPubResp, err := d.DoLalStartRtpPub(streamName, networkType)
if err != nil {
return http.StatusInternalServerError, err
}
if ctrlStartRtpPubResp.ApiRespBasic.ErrorCode != 0 {
return http.StatusInternalServerError, fmt.Errorf("start rtp pub error: %v", ctrlStartRtpPubResp.ApiRespBasic.ErrorCode)
}
opt.MediaPort = uint16(ctrlStartRtpPubResp.Data.Port)
if networkType == "tcp" {
if network == "tcp" {
opt.MediaPort = conf.MediaConfig.TCPListenPort
protocol = "TCP/"
} else {
opt.MediaPort = conf.MediaConfig.UDPListenPort
}

sdpInfo := []string{
Expand All @@ -176,7 +160,7 @@ func (channel *Channel) Invite(opt *InviteOptions, conf config.GB28181Config, st
"y=" + opt.ssrc,
}

if networkType == "tcp" {
if network == "tcp" {
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
}

Expand Down Expand Up @@ -216,13 +200,17 @@ func (channel *Channel) Invite(opt *InviteOptions, conf config.GB28181Config, st
nazalog.Info("Device support tcp")
} else {
nazalog.Info("Device not support tcp")
networkType = "udp"
network = "udp"
}
}
}
}

d.mediaInfo.IsInvite = true
d.mediaInfo.Ssrc = opt.SSRC
d.mediaInfo.StreamName = streamName

ackReq := sip.NewAckRequest("", invite, inviteRes, "", nil)
channel.sessionId = ctrlStartRtpPubResp.Data.SessionId
channel.ackReq = ackReq

err = sipsvr.Send(ackReq)
Expand All @@ -237,12 +225,6 @@ func (channel *Channel) Bye(streamName string) (err error) {
seq.SeqNo += 1
err = sipsvr.Send(byeReq)
if err == nil {
if channel.sessionId != "" {
d := channel.device
if err = d.DoLalKickSession(streamName, channel.sessionId); err == nil {
channel.sessionId = ""
}
}
channel.ackReq = nil
}
}
Expand Down Expand Up @@ -305,112 +287,3 @@ func (channel *Channel) CreateRequst(Method sip.RequestMethod, conf config.GB281
req.SetDestination(d.NetAddr)
return req
}

func (d *Device) DoLalStartRtpPub(deviceId, networkType string) (*base.ApiCtrlStartRtpPubResp, error) {
request := &base.ApiCtrlStartRtpPubReq{
StreamName: deviceId,
}

if networkType == "tcp" {
request.IsTcpFlag = 1
}

data, _ := json.Marshal(request)
url := ""
if d.ApiSsl {
url = fmt.Sprintf("https://%s:%d/api/ctrl/start_rtp_pub", d.mediaIP, d.ApiPort)
} else {
url = fmt.Sprintf("http://%s:%d/api/ctrl/start_rtp_pub", d.mediaIP, d.ApiPort)
}
body, err := d.DoLalHttpReq("POST", url, data)
if err != nil {
return nil, err
}
response := &base.ApiCtrlStartRtpPubResp{}
err = json.Unmarshal(body, &response)
if err != nil {
return nil, err
}

nazalog.Info("start_rtp_pub response:", response)

return response, nil
}
func (d *Device) DoLalGroupInfo(deviceId string) error {
url := ""
if d.ApiSsl {
url = fmt.Sprintf("https://%s:%d/api/stat/group?stream_name=%s", d.mediaIP, d.ApiPort, deviceId)
} else {
url = fmt.Sprintf("http://%s:%d/api/stat/group?stream_name=%s", d.mediaIP, d.ApiPort, deviceId)
}
body, err := d.DoLalHttpReq("GET", url, nil)
if err != nil {
return err
}
response := &base.ApiStatGroupResp{}
err = json.Unmarshal(body, &response)
if err != nil {
return err
}
if response.ApiRespBasic.ErrorCode == 1001 {
return nil
} else {
return fmt.Errorf("stat group error: %v", response.ApiRespBasic.ErrorCode)
}
}
func (d *Device) DoLalKickSession(streamName, sessionId string) error {
url := ""
if d.ApiSsl {
url = fmt.Sprintf("https://%s:%d/api/ctrl/kick_session", d.mediaIP, d.ApiPort)
} else {
url = fmt.Sprintf("http://%s:%d/api/ctrl/kick_session", d.mediaIP, d.ApiPort)
}
request := &base.ApiCtrlKickSessionReq{
StreamName: streamName,
SessionId: sessionId,
}
data, _ := json.Marshal(request)
body, err := d.DoLalHttpReq("POST", url, data)
if err != nil {
return err
}
response := &base.ApiRespBasic{}
err = json.Unmarshal(body, &response)
if err != nil {
return err
}
if response.ErrorCode == 0 {
return nil
} else {
return fmt.Errorf("kick session error: %v", response.ErrorCode)
}
}
func (d *Device) DoLalHttpReq(method string, url string, reqBody []byte) (respBody []byte, err error) {
req, err := http.NewRequest(method, url, bytes.NewReader(reqBody))
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "application/json")

cli := &http.Client{
Transport: http.DefaultTransport,
Timeout: time.Duration(5) * time.Second,
}

resp, err := cli.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("Response is not 200: %v", resp.Status)
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, err
}
7 changes: 5 additions & 2 deletions gb28181/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ type Device struct {
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
ApiPort uint16
ApiSsl bool //流媒体 Api 是否ssl
mediaInfo struct {
IsInvite bool
Ssrc uint32
StreamName string
}
}

func (d *Device) syncChannels(conf config.GB28181Config) {
Expand Down
7 changes: 3 additions & 4 deletions gb28181/http_logic.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package gb28181

import (
"github.com/gin-gonic/gin"
"sync"

"github.com/gin-gonic/gin"
)

type GbLogic struct {
Expand Down Expand Up @@ -39,7 +40,7 @@ func (g *GbLogic) StartPlay(c *gin.Context) {
if len(streamName) == 0 {
streamName = reqPlay.ChannelId
}
ch.TryAutoInvite(&InviteOptions{}, g.s.conf, streamName)
ch.TryAutoInvite(&InviteOptions{}, g.s.conf, streamName, reqPlay.NetWork)
respPlay := &RespPlay{
StreamName: streamName,
}
Expand All @@ -56,7 +57,6 @@ func (g *GbLogic) StopPlay(c *gin.Context) {
ch := g.s.FindChannel(reqStop.DeviceId, reqStop.ChannelId)
if ch == nil {
ResponseErrorWithMsg(c, CodeDeviceNotRegister, CodeDeviceNotRegister.Msg())

} else {
streamName := reqStop.StreamName
if len(streamName) == 0 {
Expand All @@ -67,7 +67,6 @@ func (g *GbLogic) StopPlay(c *gin.Context) {
} else {
ResponseSuccess(c, nil)
}

}
}

Expand Down
Loading

0 comments on commit 994c32e

Please sign in to comment.