Skip to content

Commit

Permalink
Merge pull request #81 from privacylab/httprpc
Browse files Browse the repository at this point in the history
Use HTTP for communication
  • Loading branch information
willscott authored Dec 1, 2017
2 parents 4fbf7fb + 82ddfa0 commit 70dcbe2
Show file tree
Hide file tree
Showing 77 changed files with 3,086 additions and 408 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ script:
- make ci
go:
- 1.8
- master
- 1.9
notifications:
email:
- [email protected]
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ All tests should pass before submitting a pull request
$ make test
```

The GPU backings are not built by default. Changes to `pir/`, where the
backing interface may be affected should ensure that code is tested with
`go test -tags 'cuda,opencl'` to include testing of all drivers.

### Vendoring
Talek vendors all of its dependencies into the local `vendor/` directory.
To add or update dependencies to the latest in `vendor/`, use the `govendor` tool, as follows:
Expand Down
4 changes: 2 additions & 2 deletions benchmark/commonconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
"BloomFalsePositive": 0.001,
"MaxLoadFactor": 0.90,
"LoadFactorStep": 0.05,
"WriteInterval": 5000000000,
"ReadInterval": 5000000000
"WriteInterval": "5000000000",
"ReadInterval": "5000000000"
}
7 changes: 4 additions & 3 deletions benchmark/consistency/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ func main() {
sc1 := server.Config{Config: conf, WriteInterval: time.Second, ReadBatch: 4, TrustDomain: td1}
sc2 := server.Config{Config: conf, ReadBatch: 4, TrustDomain: td2, TrustDomainIndex: 1}
//replicas
r1 := server.NewCentralized("r1", "cpu.0", sc1)
r2 := server.NewCentralized("r2", "cpu.0", sc2)
r1 := server.NewReplica("r1", "cpu.0", sc1)
r2 := server.NewReplica("r2", "cpu.0", sc2)
//client
config = &libtalek.ClientConfig{Config: conf, WriteInterval: time.Second, ReadInterval: time.Second, TrustDomains: []*common.TrustDomainConfig{td1, td2}, FrontendAddr: "localhost:9000"}
//frontend
f0 := server.NewFrontend("f0", &sc1, []common.ReplicaInterface{common.ReplicaInterface(r1), common.ReplicaInterface(r2)})
f0.Verbose = true
server.NewNetworkRPC(f0, 9000)
f0s := server.FrontendServer{Frontend: f0}
f0s.Run("localhost:9000")
} else {
// Config
config = libtalek.ClientConfigFromFile(*configPath)
Expand Down
34 changes: 19 additions & 15 deletions benchmark/load/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ var leaderPIR = flag.String("leader", "cpu.0", "PIR backing for leader shard")
var followerPIR = flag.String("follower", "cpu.1", "PIR backing for follower shard")

type killable interface {
Kill()
Close() error
}

func main() {
var err error

log.Println("Simple Sanity Test")
s := make(map[string]killable)
flag.Parse()
Expand All @@ -44,19 +46,20 @@ func main() {
serverConfigF.TrustDomain = trustDomainFE

// Trust Domain 1
t1 := server.NewCentralized("t1", *followerPIR, *serverConfig1)
s["t1"] = server.NewNetworkRPC(t1, 9001)
t1 := server.NewReplicaServer("t1", *followerPIR, *serverConfig1)
s["t1"], err = t1.Run("localhost:9001")
if err != nil {
panic(err)
}

// Trust Domain 0
//t0 := server.NewCentralized("t0", config, t1, true)
t0 := server.NewCentralized("t0", *leaderPIR, *serverConfig0)
s["t0"] = server.NewNetworkRPC(t0, 9000)
t0 := server.NewReplicaServer("t0", *leaderPIR, *serverConfig0)
s["t0"], _ = t0.Run("localhost:9000")

// Frontend
ft0 := common.NewReplicaRPC("f0-t0", trustDomainConfig0)
ft1 := common.NewReplicaRPC("f0-t1", trustDomainConfig1)
f0 := server.NewFrontend("f0", serverConfigF, []common.ReplicaInterface{ft0, ft1})
server.NewNetworkRPC(f0, 8999)
f0 := server.NewFrontendServer("f0", serverConfigF, []*common.TrustDomainConfig{trustDomainConfig0, trustDomainConfig1})
s["f0"], _ = f0.Run("localhost:8999")

// Client
clientConfig := libtalek.ClientConfig{
Expand All @@ -72,19 +75,20 @@ func main() {
for i := 0; i < *numClients; i++ {
clients[i] = libtalek.NewClient("c"+string(i), clientConfig, clientLeaderSock)
handle, _ := libtalek.NewTopic()
var origHandle libtalek.Handle
origHandle = handle.Handle
clients[i].Publish(handle, []byte("Hello from client"+string(i)))
data := clients[i].Poll(&handle.Handle)
fmt.Printf("!!! data=%v", data)
fmt.Printf("Published. Waiting for response.")
data := clients[i].Poll(&origHandle)
_ = <-data
fmt.Printf("Client Roundtrip.")
}
//c1.Ping()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
for _, v := range s {
v.Kill()
v.Close()
}
f0.Close()
t1.Close()
t0.Close()
}
13 changes: 13 additions & 0 deletions cli/clinfo/fallback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

import (
"log"
)

func fallback() {
log.Fatal("Can't run CLinfo without openCL tag")
}

func main() {
return
}
4 changes: 2 additions & 2 deletions cli/clinfo/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//+build !noopencl,!travis
//+build opencl,!travis

package main

Expand Down Expand Up @@ -73,6 +73,6 @@ func StatInfo() {
}
}

func main() {
func init() {
StatInfo()
}
4 changes: 2 additions & 2 deletions cli/talekclient/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func main() {
}

// Client-connected activity below.
leaderRPC := common.NewFrontendRPC("RPC", config.FrontendAddr)
frontendRPC := common.NewFrontendRPC("RPC", config.FrontendAddr)

client := libtalek.NewClient("Client", *config, leaderRPC)
client := libtalek.NewClient("Client", *config, frontendRPC)
if client == nil {
panic("could not create talek client")
} else if *verbose {
Expand Down
20 changes: 8 additions & 12 deletions cli/talekfrontend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"log"
"net"
"os"
"os/signal"
"strconv"

"github.com/coreos/etcd/pkg/flags"
"github.com/privacylab/talek/common"
Expand All @@ -23,6 +21,7 @@ func main() {
configPath := pflag.String("client", "talek.conf", "Talek Client Configuration")
commonPath := pflag.String("common", "common.conf", "Talek Common Configuration")
systemPath := pflag.String("server", "server.conf", "Talek Server Configuration")
listen := pflag.StringP("listen", "l", ":8080", "Listening Address")
verbose := pflag.Bool("verbose", false, "Verbose output")
err := flags.SetPflagsFromEnv(common.EnvPrefix, pflag.CommandLine)
if err != nil {
Expand All @@ -39,21 +38,18 @@ func main() {
config.Config = common.ConfigFromFile(*commonPath)
serverConfig := server.ConfigFromFile(*systemPath, config.Config)

replicas := make([]common.ReplicaInterface, len(config.TrustDomains))
for i, td := range config.TrustDomains {
replicas[i] = common.NewReplicaRPC(td.Name, td)
f := server.NewFrontendServer("Talek Frontend", serverConfig, config.TrustDomains)
f.Frontend.Verbose = *verbose
listener, err := f.Run(*listen)
if err != nil {
log.Printf("Couldn't listen to frontend address: %v\n", err)
return
}

f := server.NewFrontend("Talek Frontend", serverConfig, replicas)
f.Verbose = *verbose
_, port, _ := net.SplitHostPort(config.FrontendAddr)
pnum, _ := strconv.Atoi(port)
_ = server.NewNetworkRPC(common.FrontendInterface(f), pnum)

log.Println("Running.")

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
f.Close()
listener.Close()
}
20 changes: 11 additions & 9 deletions cli/talekreplica/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"encoding/json"
"io/ioutil"
"log"
"net"
"os"
"os/signal"
"strconv"
"time"

"github.com/coreos/etcd/pkg/flags"
Expand All @@ -27,6 +25,7 @@ func main() {
configPath := pflag.StringP("config", "c", "replica.conf", "Talek Replica Configuration (env TALEK_CONFIG)")
commonPath := pflag.StringP("common", "f", "common.conf", "Talek Common Configuration (env TALEK_COMMON)")
backing := pflag.StringP("backing", "b", "cpu.0", "PIR daemon method (env TALEK_BACKING)")
listen := pflag.StringP("listen", "l", ":8080", "Listening Address")
err := flags.SetPflagsFromEnv(common.EnvPrefix, pflag.CommandLine)
if err != nil {
log.Printf("Error reading environment variables, %v\n", err)
Expand Down Expand Up @@ -59,11 +58,11 @@ func main() {
TrustDomain: &common.TrustDomainConfig{},
TrustDomainIndex: 0,
}
if err := json.Unmarshal(configString, &serverConfig); err != nil {
if err = json.Unmarshal(configString, &serverConfig); err != nil {
log.Printf("Could not parse %s: %v\n", *configPath, err)
return
}
if err := json.Unmarshal(commonString, serverConfig.Config); err != nil {
if err = json.Unmarshal(commonString, serverConfig.Config); err != nil {
log.Printf("Could not parse %s: %v\n", *commonPath, err)
return
}
Expand All @@ -72,15 +71,18 @@ func main() {
log.Printf("serverConfig=%#+v\n", serverConfig)
log.Printf("serverConfig.Config=%#+v\n", serverConfig.Config)

s := server.NewCentralized(serverConfig.TrustDomain.Name, *backing, serverConfig)
_, port, _ := net.SplitHostPort(serverConfig.TrustDomain.Address)
pnum, _ := strconv.Atoi(port)
_ = server.NewNetworkRPC(s, pnum)
r := server.NewReplicaServer(serverConfig.TrustDomain.Name, *backing, serverConfig)
listener, err := r.Run(*listen)
if err != nil {
log.Printf("Couldn't listen to frontend address: %v\n", err)
return
}

log.Println("Running.")

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
s.Close()
r.Replica.Close()
listener.Close()
}
2 changes: 1 addition & 1 deletion common/frontend_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type WriteArgs struct {
InterestVector []uint64
//Internal
GlobalSeqNo uint64
ReplyChan chan *WriteReply
ReplyChan chan *WriteReply `json:"-"`
}

// WriteReply contain return status of writes
Expand Down
40 changes: 4 additions & 36 deletions common/frontend_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package common

import (
"log"
"net/rpc"
"os"
)

Expand All @@ -12,7 +11,6 @@ type FrontendRPC struct {
name string
address string
methodPrefix string
client *rpc.Client
}

// NewFrontendRPC instantiates a LeaderRPC stub
Expand All @@ -21,41 +19,11 @@ func NewFrontendRPC(name string, address string) *FrontendRPC {
f.log = log.New(os.Stdout, "[FrontendRPC:"+name+"] ", log.Ldate|log.Ltime|log.Lshortfile)
f.name = name
f.address = address
f.client = nil
f.methodPrefix = "Frontend"

return f
}

// Call implements an RPC call
func (f *FrontendRPC) Call(methodName string, args interface{}, reply interface{}) error {
// Get address
var err error

// Setup connection
if f.client == nil {
f.client, err = rpc.Dial("tcp", f.address)
if err != nil {
f.log.Printf("rpc dialing failed: %v\n", err)
f.client = nil
return err
}
//defer client.Close()
}

// Do RPC
err = f.client.Call(methodName, args, reply)
if err != nil {
f.log.Printf("rpc error: %v", err)
f.client.Close()
f.client = nil
return err
}

//l.log.Printf("%s.Call(): %v, %v, %v\n", addr, args, reply)
return nil
}

// GetName returns the name of the leader.
func (f *FrontendRPC) GetName(_ *interface{}, reply *string) error {
*reply = f.name
Expand All @@ -65,25 +33,25 @@ func (f *FrontendRPC) GetName(_ *interface{}, reply *string) error {
// GetConfig tells the client about current config.
func (f *FrontendRPC) GetConfig(_ *interface{}, reply *Config) error {
var args interface{}
err := f.Call(f.methodPrefix+".GetConfig", &args, reply)
err := RPCCall(f.address, f.methodPrefix+".GetConfig", &args, reply)
return err
}

func (f *FrontendRPC) Write(args *WriteArgs, reply *WriteReply) error {
//l.log.Printf("Write: enter\n")
err := f.Call(f.methodPrefix+".Write", args, reply)
err := RPCCall(f.address, f.methodPrefix+".Write", args, reply)
return err
}

func (f *FrontendRPC) Read(args *EncodedReadArgs, reply *ReadReply) error {
//l.log.Printf("Read: enter\n")
err := f.Call(f.methodPrefix+".Read", args, reply)
err := RPCCall(f.address, f.methodPrefix+".Read", args, reply)
return err
}

// GetUpdates provides the global interest vector.
func (f *FrontendRPC) GetUpdates(args *GetUpdatesArgs, reply *GetUpdatesReply) error {
//l.log.Printf("GetUpdates: enter\n")
err := f.Call(f.methodPrefix+".GetUpdates", args, reply)
err := RPCCall(f.address, f.methodPrefix+".GetUpdates", args, reply)
return err
}
2 changes: 1 addition & 1 deletion common/replica_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ReplicaWriteReply struct {
type BatchReadRequest struct {
Args []EncodedReadArgs // Set of Read requests
SeqNoRange Range
ReplyChan chan *BatchReadReply
ReplyChan chan *BatchReadReply `json:"-"`
}

// BatchReadReply is a response to a BatchReadRequest.
Expand Down
Loading

0 comments on commit 70dcbe2

Please sign in to comment.