Skip to content

Commit

Permalink
fix(topo): detach subtopo from closed rule (#3010)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Jul 16, 2024
1 parent 0237f0f commit 88fd5da
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 15 deletions.
3 changes: 2 additions & 1 deletion internal/topo/node/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

type Emitter interface {
AddOutput(chan<- interface{}, string) error
RemoveOutput(string) error
}

type Collector interface {
Expand Down Expand Up @@ -83,7 +84,7 @@ type MergeableTopo interface {
// SubMetrics return the metrics of the sub nodes
SubMetrics() ([]string, []any)
// Close notifies subtopo to deref
Close(ctx api.StreamContext, ruleId string)
Close(ctx api.StreamContext, ruleId string, runId int)
}

type SchemaNode interface {
Expand Down
28 changes: 22 additions & 6 deletions internal/topo/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package node
import (
"errors"
"fmt"
"strings"
"sync"

"github.com/lf-edge/ekuiper/contract/v2/api"
Expand Down Expand Up @@ -56,13 +57,28 @@ func newDefaultNode(name string, options *def.RuleOption) *defaultNode {
}
}

func (o *defaultNode) AddOutput(output chan<- interface{}, name string) error {
func (o *defaultNode) AddOutput(output chan<- any, name string) error {
o.outputMu.Lock()
defer o.outputMu.Unlock()
o.outputs[name] = output
return nil
}

func (o *defaultNode) RemoveOutput(name string) error {
o.outputMu.Lock()
defer o.outputMu.Unlock()
namePre := name + "_"
for n := range o.outputs {
if strings.HasPrefix(n, namePre) {
delete(o.outputs, n)
if o.ctx != nil {
o.ctx.GetLogger().Infof("Remove output %s from %s", n, o.name)
}
}
}
return nil
}

func (o *defaultNode) GetName() string {
return o.name
}
Expand All @@ -84,11 +100,11 @@ func (o *defaultNode) RemoveMetrics(ruleId string) {
}
}

func (o *defaultNode) Broadcast(val interface{}) {
func (o *defaultNode) Broadcast(val any) {
o.BroadcastCustomized(val, o.doBroadcast)
}

func (o *defaultNode) BroadcastCustomized(val interface{}, broadcastFunc func(val any)) {
func (o *defaultNode) BroadcastCustomized(val any, broadcastFunc func(val any)) {
if _, ok := val.(error); ok && !o.sendError {
return
}
Expand All @@ -104,7 +120,7 @@ func (o *defaultNode) BroadcastCustomized(val interface{}, broadcastFunc func(va
return
}

func (o *defaultNode) doBroadcast(val interface{}) {
func (o *defaultNode) doBroadcast(val any) {
o.outputMu.RLock()
defer o.outputMu.RUnlock()
l := len(o.outputs)
Expand Down Expand Up @@ -151,7 +167,7 @@ func newDefaultSinkNode(name string, options *def.RuleOption) *defaultSinkNode {
}
}

func (o *defaultSinkNode) GetInput() (chan<- interface{}, string) {
func (o *defaultSinkNode) GetInput() (chan<- any, string) {
return o.input, o.name
}

Expand Down Expand Up @@ -236,7 +252,7 @@ func (o *defaultSinkNode) handleEof(ctx api.StreamContext, d xsql.EOFTuple) {
}
}

func SourcePing(sourceType string, config map[string]interface{}) error {
func SourcePing(sourceType string, config map[string]any) error {
source, err := io.Source(sourceType)
if err != nil {
return err
Expand Down
36 changes: 36 additions & 0 deletions internal/topo/node/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package node

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
)

func TestOutputs(t *testing.T) {
n := newDefaultNode("test", &def.RuleOption{})
err := n.AddOutput(make(chan<- any), "rule.1_test")
assert.NoError(t, err)
err = n.AddOutput(make(chan<- any), "rule.2_test")
assert.NoError(t, err)
err = n.RemoveOutput("rule.1")
assert.NoError(t, err)
err = n.RemoveOutput("rule.4")
assert.NoError(t, err)
assert.Equal(t, 1, len(n.outputs))
}
7 changes: 6 additions & 1 deletion internal/topo/subtopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (s *SrcSubTopo) AddOutput(output chan<- interface{}, name string) error {
return s.tail.AddOutput(output, name)
}

func (s *SrcSubTopo) RemoveOutput(name string) error {
return s.tail.RemoveOutput(name)
}

func (s *SrcSubTopo) Open(ctx api.StreamContext, parentErrCh chan<- error) {
// Update the ref count
if _, loaded := s.refRules.LoadOrStore(ctx.GetRuleId(), parentErrCh); !loaded {
Expand Down Expand Up @@ -171,7 +175,7 @@ func (s *SrcSubTopo) StoreSchema(ruleID, dataSource string, schema map[string]*a
}
}

func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string) {
func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string, runId int) {
if _, ok := s.refRules.LoadAndDelete(ruleId); ok {
s.refCount.Add(-1)
if s.refCount.Load() == 0 {
Expand All @@ -186,6 +190,7 @@ func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string) {
}
}
}
_ = s.RemoveOutput(fmt.Sprintf("%s.%d", ruleId, runId))
}

// RemoveMetrics is called when the rule is deleted
Expand Down
22 changes: 17 additions & 5 deletions internal/topo/subtopo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func TestSubtopoLC(t *testing.T) {
assert.Equal(t, []checkpoint.StreamTask{srcNode}, sources)
assert.Equal(t, []checkpoint.NonSourceTask{opNode}, ops)
// Stop
subTopo.Close(ctx1, "rule1")
subTopo.Close(ctx1, "rule1", 1)
assert.Equal(t, int32(1), subTopo.refCount.Load())
assert.Equal(t, 1, mlen(&subTopoPool))
subTopo2.Close(ctx2, "rule2")
subTopo2.Close(ctx2, "rule2", 2)
assert.Equal(t, int32(0), subTopo.refCount.Load())
assert.Equal(t, 0, mlen(&subTopoPool))
assert.Equal(t, 2, len(subTopo.schemaReg))
Expand All @@ -138,7 +138,7 @@ func TestSubtopoRunError(t *testing.T) {
subTopo.Open(ctx1, make(chan error))
assert.Equal(t, int32(1), subTopo.refCount.Load())
assert.Equal(t, true, subTopo.opened.Load())
subTopo.Close(ctx1, "rule1")
subTopo.Close(ctx1, "rule1", 1)
assert.Equal(t, int32(0), subTopo.refCount.Load())
assert.Equal(t, 0, mlen(&subTopoPool))
time.Sleep(10 * time.Millisecond)
Expand All @@ -156,14 +156,14 @@ func TestSubtopoRunError(t *testing.T) {
select {
case err := <-errCh1:
assert.Equal(t, assert.AnError, err)
subTopo.Close(ctx1, "rule1")
subTopo.Close(ctx1, "rule1", 1)
case <-time.After(1 * time.Second):
assert.Fail(t, "Should receive error")
}
select {
case err := <-errCh2:
assert.Equal(t, assert.AnError, err)
subTopo2.Close(ctx2, "rule2")
subTopo2.Close(ctx2, "rule2", 2)
case <-time.After(1 * time.Second):
assert.Fail(t, "Should receive error")
}
Expand Down Expand Up @@ -239,6 +239,11 @@ func (m *mockSrc) AddOutput(c chan<- interface{}, s string) error {
return nil
}

func (m *mockSrc) RemoveOutput(s string) error {
m.outputs = m.outputs[1:]
return nil
}

func (m *mockSrc) Open(ctx api.StreamContext, errCh chan<- error) {
if m.runCount%3 != 0 {
fmt.Printf("sent error for %d \n", m.runCount)
Expand Down Expand Up @@ -273,6 +278,13 @@ type mockOp struct {
schemaCount int
}

func (m *mockOp) RemoveOutput(s string) error {
if len(m.outputs) > 0 {
m.outputs = m.outputs[1:]
}
return nil
}

func (m *mockOp) AddOutput(c chan<- interface{}, s string) error {
m.outputs = append(m.outputs, c)
return nil
Expand Down
11 changes: 9 additions & 2 deletions internal/topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ import (
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

var uid atomic.Uint32

// Topo is the runtime DAG for a rule
// It only run once. If the rule restarts, another topo is created.
type Topo struct {
streams []string
sources []node.DataSourceNode
Expand All @@ -48,6 +52,7 @@ type Topo struct {
drain chan error
ops []node.OperatorNode
name string
runId int
options *def.RuleOption
store api.Store
coordinator *checkpoint.Coordinator
Expand All @@ -59,8 +64,10 @@ type Topo struct {
}

func NewWithNameAndOptions(name string, options *def.RuleOption) (*Topo, error) {
id := uid.Add(1)
tp := &Topo{
name: name,
runId: int(id),
options: options,
topo: &def.PrintableTopo{
Sources: make([]string, 0),
Expand Down Expand Up @@ -115,7 +122,7 @@ func (s *Topo) Cancel() {
s.coordinator = nil
for _, src := range s.sources {
if rt, ok := src.(node.MergeableTopo); ok {
rt.Close(s.ctx, s.name)
rt.Close(s.ctx, s.name, s.runId)
}
}
}
Expand Down Expand Up @@ -158,7 +165,7 @@ func (s *Topo) AddOperator(inputs []node.Emitter, operator node.OperatorNode) *T
ch, opName := operator.GetInput()
for _, input := range inputs {
// add rule id to make operator name unique
_ = input.AddOutput(ch, fmt.Sprintf("%s_%s", s.name, opName))
_ = input.AddOutput(ch, fmt.Sprintf("%s.%d_%s", s.name, s.runId, opName))
operator.AddInputCount()
switch rt := input.(type) {
case node.MergeableTopo:
Expand Down

0 comments on commit 88fd5da

Please sign in to comment.