Skip to content

Commit

Permalink
feat: secure couchbase connection support added
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 28, 2023
1 parent d7553c2 commit 557be83
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 115 deletions.
41 changes: 21 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,34 @@ This repository contains go implementation of a Couchbase Database Change Protoc
package main

import (
"github.com/Trendyol/go-dcp-client"
"github.com/Trendyol/go-dcp-client/logger"
"github.com/Trendyol/go-dcp-client"
"github.com/Trendyol/go-dcp-client/logger"

"github.com/Trendyol/go-dcp-client/models"
"github.com/Trendyol/go-dcp-client/models"
)

func listener(ctx *models.ListenerContext) {
switch event := ctx.Event.(type) {
case models.DcpMutation:
logger.Log.Printf("mutated(vb=%v) | id: %v, value: %v | isCreated: %v", event.VbID, string(event.Key), string(event.Value), event.IsCreated())
case models.DcpDeletion:
logger.Log.Printf("deleted(vb=%v) | id: %v", event.VbID, string(event.Key))
case models.DcpExpiration:
logger.Log.Printf("expired(vb=%v) | id: %v", event.VbID, string(event.Key))
}

ctx.Ack()
switch event := ctx.Event.(type) {
case models.DcpMutation:
logger.Log.Printf("mutated(vb=%v) | id: %v, value: %v | isCreated: %v", event.VbID, string(event.Key), string(event.Value), event.IsCreated())
case models.DcpDeletion:
logger.Log.Printf("deleted(vb=%v) | id: %v", event.VbID, string(event.Key))
case models.DcpExpiration:
logger.Log.Printf("expired(vb=%v) | id: %v", event.VbID, string(event.Key))
}

ctx.Ack()
}

func main() {
dcp, err := godcpclient.NewDcp("config.yml", listener)
if err != nil {
panic(err)
}
dcp, err := godcpclient.NewDcp("config.yml", listener)
if err != nil {
panic(err)
}

defer dcp.Close()
defer dcp.Close()

dcp.Start()
dcp.Start()
}
```

Expand All @@ -69,6 +69,8 @@ $ go get github.com/Trendyol/go-dcp-client
| `password` | string | yes | |
| `bucketName` | string | yes | |
| `dcp.group.name` | string | yes | |
| `secureConnection` | bool | no | false |
| `rootCAPath` | string | no | *not set |
| `dcp.group.membership.type` | string | no | couchbase |
| `scopeName` | string | no | _default |
| `collectionNames` | []string | no | _default |
Expand All @@ -87,7 +89,6 @@ $ go get github.com/Trendyol/go-dcp-client
| `leaderElection.type` | string | no | *not set |
| `leaderElection.config` | map[string]string | no | *not set |
| `leaderElection.rpc.port` | int | no | 8081 |
| `logger.level` | string | no | info |
| `checkpoint.type` | string | no | auto |
| `checkpoint.autoReset` | string | no | earliest |
| `checkpoint.interval` | time.Duration | no | 20s |
Expand Down
45 changes: 33 additions & 12 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package couchbase

import (
"context"
"crypto/x509"
"errors"
"fmt"
"os"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -113,19 +115,43 @@ func (s *client) GetAgent() *gocbcore.Agent {
return s.agent
}

func (s *client) tlsRootCaProvider() *x509.CertPool {
cert, err := os.ReadFile(os.ExpandEnv(s.config.RootCAPath))
if err != nil {
logger.ErrorLog.Printf("error while reading cert file: %v", err)
panic(err)
}

certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(cert)

return nil
}

func (s *client) getSecurityConfig() gocbcore.SecurityConfig {
config := gocbcore.SecurityConfig{
Auth: gocbcore.PasswordAuthProvider{
Username: s.config.Username,
Password: s.config.Password,
},
}

if s.config.SecureConnection {
config.UseTLS = true
config.TLSRootCAProvider = s.tlsRootCaProvider
}

return config
}

func (s *client) connect(bucketName string) (*gocbcore.Agent, error) {
client, err := gocbcore.CreateAgent(
&gocbcore.AgentConfig{
BucketName: bucketName,
SeedConfig: gocbcore.SeedConfig{
HTTPAddrs: s.config.Hosts,
},
SecurityConfig: gocbcore.SecurityConfig{
Auth: gocbcore.PasswordAuthProvider{
Username: s.config.Username,
Password: s.config.Password,
},
},
SecurityConfig: s.getSecurityConfig(),
CompressionConfig: gocbcore.CompressionConfig{
Enabled: true,
},
Expand Down Expand Up @@ -213,12 +239,7 @@ func (s *client) DcpConnect() error {
SeedConfig: gocbcore.SeedConfig{
HTTPAddrs: s.config.Hosts,
},
SecurityConfig: gocbcore.SecurityConfig{
Auth: gocbcore.PasswordAuthProvider{
Username: s.config.Username,
Password: s.config.Password,
},
},
SecurityConfig: s.getSecurityConfig(),
CompressionConfig: gocbcore.CompressionConfig{
Enabled: true,
},
Expand Down
2 changes: 0 additions & 2 deletions dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ metadata:
bucket: sample
checkpoint:
type: manual
logging:
level: debug
dcp:
listener:
bufferSize: 1024
Expand Down
73 changes: 0 additions & 73 deletions example/deployment.yaml

This file was deleted.

38 changes: 38 additions & 0 deletions example/kubernetes/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: godcpclient
name: godcpclient
spec:
replicas: 1
selector:
matchLabels:
app: godcpclient
template:
metadata:
labels:
app: godcpclient
spec:
automountServiceAccountToken: true # need for kubernetes leader election type
serviceAccount: godcpclient-sa
containers:
- image: docker.io/trendyoltech/godcpclient:latest # change this to your image
imagePullPolicy: Never
name: godcpclient
ports:
- containerPort: 8081 # need for kubernetes leader election type
name: rpc
- containerPort: 8080
name: http
env:
- name: POD_IP # need for kubernetes leader election type
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME # need for kubernetes leader election type
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
18 changes: 18 additions & 0 deletions example/kubernetes/role.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# need for kubernetes leader election type
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: godcpclient-role
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
- apiGroups:
- ''
resources:
- pods
verbs:
- patch
12 changes: 12 additions & 0 deletions example/kubernetes/role_binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# need for kubernetes leader election type
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: godcpclient-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: godcpclient-role
subjects:
- kind: ServiceAccount
name: godcpclient-sa
5 changes: 5 additions & 0 deletions example/kubernetes/service_account.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: v1
automountServiceAccountToken: true
kind: ServiceAccount
metadata:
name: godcpclient-sa
7 changes: 2 additions & 5 deletions helpers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ type ConfigRPC struct {
Port int `yaml:"port" default:"8081"`
}

type ConfigLogging struct {
Level string `yaml:"level" default:"info"`
}

type ConfigCheckpoint struct {
Type string `yaml:"type" default:"auto"`
AutoReset string `yaml:"autoReset" default:"earliest"`
Expand Down Expand Up @@ -89,7 +85,8 @@ type Config struct {
CollectionNames []string `yaml:"collectionNames"`
Password string `yaml:"password"`
Username string `yaml:"username"`
Logging ConfigLogging `yaml:"logging"`
SecureConnection bool `yaml:"secureConnection"`
RootCAPath string `yaml:"rootCAPath"`
Hosts []string `yaml:"hosts"`
Checkpoint ConfigCheckpoint `yaml:"checkpoint"`
Dcp ConfigDCP `yaml:"dcp"`
Expand Down
3 changes: 0 additions & 3 deletions helpers/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ metadata:
bucket: sample
checkpoint:
type: manual
logging:
level: debug
dcp:
listener:
bufferSize: 1024
Expand Down Expand Up @@ -48,7 +46,6 @@ func TestLoadConfig(t *testing.T) {
assert.Contains(t, config.CollectionNames, "_default")
assert.Equal(t, "sample", config.Metadata.Config["bucket"])
assert.Equal(t, "manual", config.Checkpoint.Type)
assert.Equal(t, "debug", config.Logging.Level)
assert.Equal(t, 1024, config.Dcp.Listener.BufferSize)
assert.Equal(t, "groupName", config.Dcp.Group.Name)
assert.Equal(t, "static", config.Dcp.Group.Membership.Type)
Expand Down

0 comments on commit 557be83

Please sign in to comment.