Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
yinqiwen committed Jun 18, 2018
1 parent adb7ee4 commit dc3c75a
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 15 deletions.
6 changes: 6 additions & 0 deletions client.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
"SessionIdleTimeout":300
},

"ProxyLimit":{
"WhiteList":["*"],
"BlackList":[]
},

// expose port by upnp for p2p
//"UPNPExposePort":56789,

Expand Down Expand Up @@ -72,6 +77,7 @@
//{"Host":["*"],"Remote":"direct"},
//{"URL":["*"],"Remote":"direct"},
//{"Method":["CONNECT"],"Remote":"direct"}
{"Rule":["IsPrivateIP"],"Remote":"direct"},
{"Remote":"Default"}
]
},
Expand Down
1 change: 1 addition & 0 deletions common/channel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ type ProxyChannelConfig struct {
RemoteSNIProxy map[string]string
HibernateAfterSecs int
P2PToken string
P2S2PEnable bool

proxyURL *url.URL
lazyConnect bool
Expand Down
33 changes: 30 additions & 3 deletions common/channel/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (s *muxSessionHolder) init(lock bool) error {
logger.Debug("Mux session woulde expired after %d seconds.", expireAfter)
s.expireTime = time.Now().Add(time.Duration(expireAfter) * time.Second)
}

if features.Pingable && s.conf.HeartBeatPeriod > 0 {
go s.heartbeat(s.conf.HeartBeatPeriod)
}
Expand Down Expand Up @@ -192,6 +193,14 @@ func (ch *LocalProxyChannel) isP2PSessionEstablisehd() bool {
return !empty
}

func (ch *LocalProxyChannel) closeAll() {
for holder := range ch.sessions {
if nil != holder {
holder.close()
}
}
}

func (ch *LocalProxyChannel) setP2PSession(c net.Conn, s mux.MuxSession, authReq *mux.AuthRequest) {
ch.p2pSessions.Store(s, true)
if nil != s {
Expand Down Expand Up @@ -284,18 +293,36 @@ func (ch *LocalProxyChannel) Init(lock bool) bool {
} else {
v := reflect.New(t)
p := v.Interface().(LocalChannel)
shouldInit := false
if ch.Conf.P2S2PEnable && len(conf.P2PToken) > 0 {
ch.Conf.lazyConnect = false
shouldInit = true
if ch.Conf.HeartBeatPeriod <= 0 || ch.Conf.HeartBeatPeriod >= 10 {
ch.Conf.HeartBeatPeriod = 3
}
}
for i := 0; i < conf.ConnsPerServer; i++ {
_, err := ch.createMuxSessionByProxy(p, server, i == 0)
if 0 == i {
shouldInit = true
}
holder, err := ch.createMuxSessionByProxy(p, server, shouldInit)
if nil != err {
logger.Error("[ERROR]Failed to create mux session for %s:%d with reason:%v", server, i, err)
break
} else {
success = true
if len(conf.P2PToken) > 0 {
if !conf.P2S2PEnable {
holder.close()
}
}
}
}
if success {
if len(conf.P2PToken) > 0 && strings.EqualFold(schema, "tcp") {
go startP2PSession(server, p, ch)
if len(conf.P2PToken) > 0 {
if !conf.P2S2PEnable {
go startP2PSession(server, p, ch)
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions common/channel/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func handleProxyStream(stream mux.MuxStream, ctx *sessionContext) {
logger.Error("[ERROR]:Failed to read connect request:%v", err)
return
}
start := time.Now()
logger.Debug("[%d]Start handle stream:%v with comprresor:%s", stream.StreamID(), creq, ctx.auth.CompressMethod)
if !defaultProxyLimitConfig.Allowed(creq.Addr) {
logger.Error("'%s' is NOT allowed by proxy limit config.", creq.Addr)
Expand Down Expand Up @@ -209,6 +210,7 @@ func handleProxyStream(stream mux.MuxStream, ctx *sessionContext) {
if len(buf) > 0 {
upBytesPool.Put(buf)
}
logger.Debug("[%d]1Cost %v to handle stream:%v ", stream.StreamID(), time.Now().Sub(start), creq)
closeSig <- true
}()

Expand All @@ -234,6 +236,7 @@ func handleProxyStream(stream mux.MuxStream, ctx *sessionContext) {
}
c.Close()
stream.Close()
logger.Debug("[%d]2Cost %v to handle stream:%v ", stream.StreamID(), time.Now().Sub(start), creq)
break
}
if len(buf) > 0 {
Expand All @@ -246,6 +249,7 @@ func handleProxyStream(stream mux.MuxStream, ctx *sessionContext) {
if close, ok := streamReader.(io.Closer); ok {
close.Close()
}
logger.Debug("[%d]Cost %v to handle stream:%v ", stream.StreamID(), time.Now().Sub(start), creq)
}

var DefaultServerCipher CipherConfig
Expand Down Expand Up @@ -332,6 +336,9 @@ func ServProxyMuxSession(session mux.MuxSession, auth *mux.AuthRequest, raddr ne
}
continue
}
if 0 == atomic.LoadInt32(&ctx.streamCouter) {
emptySessions.Store(ctx, true)
}
stream, err := session.AcceptStream()
if nil != err {
if err != pmux.ErrSessionShutdown {
Expand Down
7 changes: 6 additions & 1 deletion common/channel/remote_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"net"
"sync"
"time"

"github.com/yinqiwen/gsnova/common/logger"
"github.com/yinqiwen/gsnova/common/mux"
Expand Down Expand Up @@ -130,14 +131,18 @@ func handleP2PProxyStream(stream mux.MuxStream, ctx *sessionContext) {
stream.Close()
return
}
start := time.Now()
closeSig := make(chan bool, 1)
go func() {
io.Copy(stream, peerStream)
closeSig <- true
logger.Info("P2P:Cost %v to copy local to remote", time.Now().Sub(start))
stream.Close()
closeSig <- true
}()
io.Copy(peerStream, stream)
logger.Info("P2P:Cost %v to copy remote to local", time.Now().Sub(start))
<-closeSig
stream.Close()
peerStream.Close()

}
26 changes: 17 additions & 9 deletions local/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
BlockedByGFWRule = "BlockedByGFW"
InHostsRule = "InHosts"
IsCNIPRule = "IsCNIP"
IsPrivateIPRule = "IsPrivateIP"
)

func matchHostnames(pattern, host string) bool {
Expand Down Expand Up @@ -116,6 +117,12 @@ func (pac *PACConfig) matchRules(ip string, req *http.Request) bool {
}
logger.Debug("ip:%s is CNIP:%v", ip, ok)
}
} else if strings.EqualFold(rule, IsPrivateIPRule) {
if len(ip) == 0 {
ok = false
} else {
ok = helper.IsPrivateIP(ip)
}
} else {
logger.Error("###Invalid rule:%s", rule)
}
Expand Down Expand Up @@ -187,11 +194,12 @@ func (dump *HTTPDumpConfig) MatchDomain(host string) bool {
}

type ProxyConfig struct {
Local string
Forward string
MITM bool //Man-in-the-middle
HTTPDump HTTPDumpConfig
PAC []PACConfig
Local string
Forward string
MITM bool //Man-in-the-middle
Transparent bool
HTTPDump HTTPDumpConfig
PAC []PACConfig
}

func (cfg *ProxyConfig) getProxyChannelByHost(proto string, host string) string {
Expand All @@ -201,10 +209,10 @@ func (cfg *ProxyConfig) getProxyChannelByHost(proto string, host string) string

func (cfg *ProxyConfig) findProxyChannelByRequest(proto string, ip string, req *http.Request) string {
var channelName string
if len(ip) > 0 && helper.IsPrivateIP(ip) {
//channel = "direct"
return channel.DirectChannelName
}
// if len(ip) > 0 && helper.IsPrivateIP(ip) {
// //channel = "direct"
// return channel.DirectChannelName
// }
for _, pac := range cfg.PAC {
if pac.Match(proto, ip, req) {
channelName = pac.Remote
Expand Down
17 changes: 15 additions & 2 deletions local/local_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type proxyStreamContext struct {
c io.ReadWriteCloser
}

var ssidSeed = uint32(0)

func serveProxyConn(conn net.Conn, remoteHost, remotePort string, proxy *ProxyConfig) {
var proxyChannelName string
protocol := "tcp"
Expand Down Expand Up @@ -98,7 +100,7 @@ func serveProxyConn(conn net.Conn, remoteHost, remotePort string, proxy *ProxyCo
mitmEnabled = true
return nil
}

//logger.Info("###Enter with %v %v", isTransparentProxy, remoteHost)
if !isTransparentProxy {
socksConn, sbufconn, err := socks.NewSocksConn(conn)
if nil == err {
Expand Down Expand Up @@ -291,6 +293,9 @@ START:
}

ssid := stream.StreamID()
if 0 == ssid {
ssid = atomic.AddUint32(&ssidSeed, uint32(1))
}
opt := mux.StreamOptions{
DialTimeout: conf.RemoteDialMSTimeout,
Hops: conf.Hops,
Expand Down Expand Up @@ -350,12 +355,15 @@ START:
streamCtx.c = localConn
activeStreams.Store(streamCtx, true)

start := time.Now()
closeCh := make(chan int, 1)
go func() {
//buf := make([]byte, 128*1024)
buf := downBytesPool.Get().([]byte)
io.CopyBuffer(localConn, streamReader, buf)
logger.Notice("Proxy stream[%d] cost %v to copy from %s:%v", ssid, time.Now().Sub(start), remoteHost, remotePort)
localConn.Close()
bufconn.Close()
downBytesPool.Put(buf)
closeCh <- 1
}()
Expand All @@ -368,6 +376,7 @@ START:
for {
localConn.SetReadDeadline(time.Now().Add(maxIdleTime))
_, cerr := io.CopyBuffer(streamWriter, bufconn, buf)
//logger.Notice("Proxy stream[%d] cost %v to copy from local to %s:%v %v", ssid, time.Now().Sub(start), remoteHost, remotePort, cerr)
if isTimeoutErr(cerr) && time.Now().Sub(stream.LatestIOTime()) < maxIdleTime {
continue
}
Expand All @@ -377,7 +386,11 @@ START:
upBytesPool.Put(buf)
if close, ok := streamWriter.(io.Closer); ok {
close.Close()
} else {
stream.Close()
}
//localConn.Close()
logger.Notice("Proxy stream[%d] cost %v to copy from local to %s:%v", ssid, time.Now().Sub(start), remoteHost, remotePort)
} else {
proxyReq := initialHTTPReq
initialHTTPReq = nil
Expand Down Expand Up @@ -447,7 +460,7 @@ func startLocalProxyServer(proxyIdx int) (*net.TCPListener, error) {
continue
}
isTransparent := false
if supportTransparentProxy() {
if proxyConf.Transparent && supportTransparentProxy() {
tcpAddr, ok := conn.RemoteAddr().(*net.TCPAddr)
if ok {
_, exist := helper.GetLocalIPSet()[tcpAddr.IP.String()]
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func main() {
servable := flag.Bool("servable", false, "Client as a proxy server for peer p2p client")
proxy := flag.String("proxy", "", "Proxy setting to connect remote server.")
upnpPort := flag.Int("upnp", 0, "UPNP port to expose for p2p.")
p2s2p := flag.Bool("p2s2p", false, "Connect two peers by P2S2P mode.")

//client or server listen
var listens channel.HopServers
Expand Down Expand Up @@ -195,6 +196,7 @@ func main() {
ch.ServerList = []string{hops[0]}
ch.Hops = hops[1:]
ch.P2PToken = *p2p
ch.P2S2PEnable = *p2s2p

ch.Proxy = *proxy
local.GConf.Channel = []channel.ProxyChannelConfig{ch}
Expand Down

0 comments on commit dc3c75a

Please sign in to comment.