Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support manage connection #2867

Merged
merged 24 commits into from
May 27, 2024
Merged
271 changes: 271 additions & 0 deletions internal/io/connection/connection_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connection

import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/pkg/store"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/pkg/kv"
)

type RegisterConnection func(ctx api.StreamContext, id string, props map[string]any) (Connection, error)

var ConnectionRegister map[string]RegisterConnection

func init() {
ConnectionRegister = map[string]RegisterConnection{}
ConnectionRegister["mock"] = createMockConnection
}

var isTest bool

type Connection interface {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
Ping(ctx api.StreamContext) error
Close(ctx api.StreamContext)
Attach(ctx api.StreamContext)
DetachSub(ctx api.StreamContext, props map[string]any)
DetachPub(ctx api.StreamContext, props map[string]any)
Ref(ctx api.StreamContext) int
}

func GetAllConnectionsID() []string {
globalConnectionManager.RLock()
defer globalConnectionManager.RUnlock()
ids := make([]string, 0)
for key := range globalConnectionManager.connectionPool {
ids = append(ids, key)
}
return ids
}

func PingConnection(ctx api.StreamContext, id string) error {
conn, err := GetNameConnection(id)
if err != nil {
return err

Check warning on line 62 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L62

Added line #L62 was not covered by tests
}
return conn.Ping(ctx)
}

func GetNameConnection(selId string) (Connection, error) {
if selId == "" {
return nil, fmt.Errorf("connection id should be defined")

Check warning on line 69 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L69

Added line #L69 was not covered by tests
}
globalConnectionManager.RLock()
defer globalConnectionManager.RUnlock()
meta, ok := globalConnectionManager.connectionPool[selId]
if !ok {
return nil, fmt.Errorf("connection %s not existed", selId)
}
return meta.conn, nil
}

func CreateNamedConnection(ctx api.StreamContext, id, typ string, props map[string]any) (Connection, error) {
if id == "" || typ == "" {
return nil, fmt.Errorf("connection id and type should be defined")

Check warning on line 82 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L82

Added line #L82 was not covered by tests
}
globalConnectionManager.Lock()
defer globalConnectionManager.Unlock()
_, ok := globalConnectionManager.connectionPool[id]
if ok {
return nil, fmt.Errorf("connection %v already been created", id)

Check warning on line 88 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L88

Added line #L88 was not covered by tests
}
meta := ConnectionMeta{
ID: id,
Typ: typ,
Props: props,
}
if !isTest {
b, err := json.Marshal(meta)
if err != nil {
return nil, err

Check warning on line 98 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L96-L98

Added lines #L96 - L98 were not covered by tests
}
if err := globalConnectionManager.store.Set(id, string(b)); err != nil {
return nil, err

Check warning on line 101 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L100-L101

Added lines #L100 - L101 were not covered by tests
}
}
conn, err := createNamedConnection(ctx, meta)
if err != nil {
return nil, err

Check warning on line 106 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L106

Added line #L106 was not covered by tests
}
meta.conn = conn
globalConnectionManager.connectionPool[id] = meta
return conn, nil
}

func CreateNonStoredConnection(ctx api.StreamContext, id, typ string, props map[string]any) (Connection, error) {
if id == "" || typ == "" {
return nil, fmt.Errorf("connection id and type should be defined")

Check warning on line 115 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L115

Added line #L115 was not covered by tests
}
globalConnectionManager.Lock()
defer globalConnectionManager.Unlock()
_, ok := globalConnectionManager.connectionPool[id]
if ok {
return nil, fmt.Errorf("connection %v already been created", id)

Check warning on line 121 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L121

Added line #L121 was not covered by tests
}
meta := ConnectionMeta{
ID: id,
Typ: typ,
Props: props,
}
conn, err := createNamedConnection(ctx, meta)
if err != nil {
return nil, err

Check warning on line 130 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L130

Added line #L130 was not covered by tests
}
meta.conn = conn
globalConnectionManager.connectionPool[id] = meta
return conn, nil
}

func DropNonStoredConnection(ctx api.StreamContext, selId string) error {
if selId == "" {
return fmt.Errorf("connection id should be defined")

Check warning on line 139 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L139

Added line #L139 was not covered by tests
}
globalConnectionManager.Lock()
defer globalConnectionManager.Unlock()
meta, ok := globalConnectionManager.connectionPool[selId]
if !ok {
return nil

Check warning on line 145 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L145

Added line #L145 was not covered by tests
}
conn := meta.conn
conn.Close(ctx)
delete(globalConnectionManager.connectionPool, selId)
return nil
}

func createNamedConnection(ctx api.StreamContext, meta ConnectionMeta) (Connection, error) {
var conn Connection
var err error
connRegister, ok := ConnectionRegister[strings.ToLower(meta.Typ)]
if !ok {
return nil, fmt.Errorf("unknown connection type")

Check warning on line 158 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L158

Added line #L158 was not covered by tests
}
conn, err = connRegister(ctx, meta.ID, meta.Props)
if err != nil {
return nil, err

Check warning on line 162 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L162

Added line #L162 was not covered by tests
}
return conn, nil
}

func DropNameConnection(ctx api.StreamContext, selId string) error {
if selId == "" {
return fmt.Errorf("connection id should be defined")

Check warning on line 169 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L169

Added line #L169 was not covered by tests
}
globalConnectionManager.Lock()
defer globalConnectionManager.Unlock()
meta, ok := globalConnectionManager.connectionPool[selId]
if !ok {
return nil

Check warning on line 175 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L175

Added line #L175 was not covered by tests
}
conn := meta.conn
if conn.Ref(ctx) > 0 {
return fmt.Errorf("connection %s can't be dropped due to reference", selId)
}
if !isTest {
err := globalConnectionManager.store.Delete(selId)
if err != nil {
return fmt.Errorf("drop connection %s failed, err:%v", selId, err)

Check warning on line 184 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L182-L184

Added lines #L182 - L184 were not covered by tests
}
}
conn.Close(ctx)
delete(globalConnectionManager.connectionPool, selId)
return nil
}

var globalConnectionManager *ConnectionManager

func InitConnectionManagerInTest() {
isTest = true
InitConnectionManager()
}

func InitConnectionManager() error {
globalConnectionManager = &ConnectionManager{
connectionPool: make(map[string]ConnectionMeta),
}
if !isTest {
globalConnectionManager.store, _ = store.GetKV("connectionMeta")
kvs, _ := globalConnectionManager.store.All()
for connectionID, raw := range kvs {
meta := ConnectionMeta{}
err := json.Unmarshal([]byte(raw), &meta)
if err != nil {
return fmt.Errorf("initialize connection:%v failed, err:%v", connectionID, err)

Check warning on line 210 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L204-L210

Added lines #L204 - L210 were not covered by tests
}
conn, err := createNamedConnection(context.Background(), meta)
if err != nil {
return fmt.Errorf("initialize connection:%v failed, err:%v", connectionID, err)

Check warning on line 214 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L212-L214

Added lines #L212 - L214 were not covered by tests
}
meta.conn = conn
globalConnectionManager.connectionPool[connectionID] = meta

Check warning on line 217 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L216-L217

Added lines #L216 - L217 were not covered by tests
}
}
return nil
}

type ConnectionManager struct {
sync.RWMutex
store kv.KeyValue
ngjaying marked this conversation as resolved.
Show resolved Hide resolved
connectionPool map[string]ConnectionMeta
}

type ConnectionMeta struct {
ID string `json:"id"`
Typ string `json:"typ"`
Props map[string]any `json:"props"`
conn Connection `json:"-"`
}

type mockConnection struct {
ngjaying marked this conversation as resolved.
Show resolved Hide resolved
id string
ref int
}

func (m *mockConnection) Ping(ctx api.StreamContext) error {
return nil
}

func (m *mockConnection) Close(ctx api.StreamContext) {
return
}

func (m *mockConnection) Attach(ctx api.StreamContext) {
m.ref++
return
}

func (m *mockConnection) DetachSub(ctx api.StreamContext, props map[string]any) {
m.ref--
return
}

func (m *mockConnection) DetachPub(ctx api.StreamContext, props map[string]any) {
m.ref--
return
}

func (m *mockConnection) Ref(ctx api.StreamContext) int {
return m.ref
}

func createMockConnection(ctx api.StreamContext, id string, props map[string]any) (Connection, error) {
m := &mockConnection{id: id, ref: 0}
return m, nil
}
51 changes: 51 additions & 0 deletions internal/io/connection/connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connection

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/topo/context"
)

func TestConnection(t *testing.T) {
InitConnectionManagerInTest()
ctx := context.Background()
conn, err := CreateNamedConnection(ctx, "id1", "mock", nil)
require.NoError(t, err)
require.NotNil(t, conn)
require.NoError(t, conn.Ping(ctx))
require.Equal(t, 0, conn.Ref(ctx))
conn.Attach(ctx)
require.Equal(t, 1, conn.Ref(ctx))
conn.Attach(ctx)
require.Equal(t, 2, conn.Ref(ctx))
conn.DetachPub(ctx, nil)
require.Equal(t, 1, conn.Ref(ctx))
err = DropNameConnection(ctx, "id1")
require.Error(t, err)
conn2, err := GetNameConnection("id1")
require.NoError(t, err)
require.NotNil(t, conn2)
conn.DetachSub(ctx, nil)
require.Equal(t, 0, conn.Ref(ctx))
err = DropNameConnection(ctx, "id1")
require.NoError(t, err)
conn3, err := GetNameConnection("id1")
require.Error(t, err)
require.Nil(t, conn3)
}
Loading
Loading