Skip to content

Commit

Permalink
feat(graph): restore graph api (#2979)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Jul 4, 2024
1 parent 2a52897 commit e82fe62
Show file tree
Hide file tree
Showing 12 changed files with 2,570 additions and 99 deletions.
9 changes: 8 additions & 1 deletion internal/server/rule_manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,6 +26,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/pkg/store"
"github.com/lf-edge/ekuiper/v2/internal/server/promMetrics"
"github.com/lf-edge/ekuiper/v2/internal/topo/planner"
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
Expand Down Expand Up @@ -458,6 +459,12 @@ func validateRule(name, ruleJson string) ([]string, bool, error) {
return nil, false, err
}
}
} else if rule.Graph != nil {
tp, err := planner.PlanByGraph(rule)
if err != nil {
return nil, false, fmt.Errorf("invalid rule graph: %v", err)
}
sources = tp.GetTopo().Sources
}
return sources, true, nil
}
306 changes: 306 additions & 0 deletions internal/server/rule_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package server

import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/lf-edge/ekuiper/v2/internal/meta"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
store2 "github.com/lf-edge/ekuiper/v2/internal/pkg/store"
"github.com/lf-edge/ekuiper/v2/internal/processor"
"github.com/lf-edge/ekuiper/v2/internal/topo/graph"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/conf"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
)

type RuleMigrationProcessor struct {
Expand Down Expand Up @@ -65,6 +69,7 @@ type dependencies struct {

func ruleTraverse(rule *def.Rule, de *dependencies) {
sql := rule.Sql
ruleGraph := rule.Graph
if sql != "" {
stmt, err := xsql.GetStatementFromSql(sql)
if err != nil {
Expand Down Expand Up @@ -145,6 +150,162 @@ func ruleTraverse(rule *def.Rule, de *dependencies) {

// Rules
de.rules = append(de.rules, rule.Id)
} else {
for _, gn := range ruleGraph.Nodes {
switch gn.Type {
case "source":
sourceOption := &ast.Options{}
err := cast.MapToStruct(gn.Props, sourceOption)
if err != nil {
break
}
sourceOption.TYPE = gn.NodeType

de.sources = append(de.sources, sourceOption.TYPE)
// get config key
_, ok := de.sourceConfigKeys[sourceOption.TYPE]
if ok {
de.sourceConfigKeys[sourceOption.TYPE] = append(de.sourceConfigKeys[sourceOption.TYPE], sourceOption.CONF_KEY)
} else {
var confKeys []string
confKeys = append(confKeys, sourceOption.CONF_KEY)
de.sourceConfigKeys[sourceOption.TYPE] = confKeys
}
// get schema id
if sourceOption.SCHEMAID != "" {
r := strings.Split(sourceOption.SCHEMAID, ".")
de.schemas = append(de.schemas, sourceOption.FORMAT+"_"+r[0])
}
case "sink":
sinkType := gn.NodeType
props := gn.Props
de.sinks = append(de.sinks, sinkType)
resourceId, ok := props[conf.ResourceID].(string)
if ok {
_, ok := de.sinkConfigKeys[sinkType]
if ok {
de.sinkConfigKeys[sinkType] = append(de.sinkConfigKeys[sinkType], resourceId)
} else {
var confKeys []string
confKeys = append(confKeys, resourceId)
de.sinkConfigKeys[sinkType] = confKeys
}
}

format, ok := props["format"].(string)
if ok && format != "json" {
schemaId, ok := props["schemaId"].(string)
if ok {
r := strings.Split(schemaId, ".")
de.schemas = append(de.schemas, format+"_"+r[0])
}
}
case "operator":
nt := strings.ToLower(gn.NodeType)
switch nt {
case "function":
fop, err := parseFunc(gn.Props)
if err != nil {
break
}
ast.WalkFunc(fop, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
de.functions = append(de.functions, f.Name)
}
return true
})
case "aggfunc":
fop, err := parseFunc(gn.Props)
if err != nil {
break
}
ast.WalkFunc(fop, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
de.functions = append(de.functions, f.Name)
}
return true
})
case "filter":
fop, err := parseFilter(gn.Props)
if err != nil {
break
}
ast.WalkFunc(fop, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
de.functions = append(de.functions, f.Name)
}
return true
})
case "pick":
pop, err := parsePick(gn.Props)
if err != nil {
break
}
ast.WalkFunc(pop, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
de.functions = append(de.functions, f.Name)
}
return true
})
case "join":
jop, err := parseJoin(gn.Props)
if err != nil {
break
}
ast.WalkFunc(jop, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
de.functions = append(de.functions, f.Name)
}
return true
})
case "groupby":
gop, err := parseGroupBy(gn.Props)
if err != nil {
break
}
ast.WalkFunc(gop, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
de.functions = append(de.functions, f.Name)
}
return true
})
case "orderby":
oop, err := parseOrderBy(gn.Props)
if err != nil {
break
}
ast.WalkFunc(oop, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
de.functions = append(de.functions, f.Name)
}
return true
})
case "switch":
opArray, err := parseSwitch(gn.Props)
if err != nil {
break
}
for _, op := range opArray {
ast.WalkFunc(op, func(n ast.Node) bool {
switch f := n.(type) {
case *ast.Call:
de.functions = append(de.functions, f.Name)
}
return true
})
}
}
default:
break
}
}
}
}

Expand Down Expand Up @@ -271,3 +432,148 @@ func (p *RuleMigrationProcessor) exportSelected(de *dependencies, config *Config

config.Uploads = uploadsExport()
}

func parsePick(props map[string]interface{}) (*ast.SelectStatement, error) {
n := &graph.Select{}
err := cast.MapToStruct(props, n)
if err != nil {
return nil, err
}
stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse()
if err != nil {
return nil, err
} else {
return stmt, nil
}
}

func parseFunc(props map[string]interface{}) (*ast.SelectStatement, error) {
m, ok := props["expr"]
if !ok {
return nil, errors.New("no expr")
}
funcExpr, ok := m.(string)
if !ok {
return nil, fmt.Errorf("expr %v is not string", m)
}
stmt, err := xsql.NewParser(strings.NewReader("select " + funcExpr + " from nonexist")).Parse()
if err != nil {
return nil, err
} else {
return stmt, nil
}
}

func parseFilter(props map[string]interface{}) (ast.Expr, error) {
m, ok := props["expr"]
if !ok {
return nil, errors.New("no expr")
}
conditionExpr, ok := m.(string)
if !ok {
return nil, fmt.Errorf("expr %v is not string", m)
}
p := xsql.NewParser(strings.NewReader(" where " + conditionExpr))
if exp, err := p.ParseCondition(); err != nil {
return nil, err
} else {
return exp, nil
}
}

func parseHaving(props map[string]interface{}) (ast.Expr, error) {
m, ok := props["expr"]
if !ok {
return nil, errors.New("no expr")
}
conditionExpr, ok := m.(string)
if !ok {
return nil, fmt.Errorf("expr %v is not string", m)
}
p := xsql.NewParser(strings.NewReader("where " + conditionExpr))
if exp, err := p.ParseCondition(); err != nil {
return nil, err
} else {
return exp, nil
}
}

func parseSwitch(props map[string]interface{}) ([]ast.Expr, error) {
n := &graph.Switch{}
err := cast.MapToStruct(props, n)
if err != nil {
return nil, err
}
if len(n.Cases) == 0 {
return nil, fmt.Errorf("switch node must have at least one case")
}
caseExprs := make([]ast.Expr, len(n.Cases))
for i, c := range n.Cases {
p := xsql.NewParser(strings.NewReader("where " + c))
if exp, err := p.ParseCondition(); err != nil {
return nil, fmt.Errorf("parse case %d error: %v", i, err)
} else {
if exp != nil {
caseExprs[i] = exp
}
}
}
return caseExprs, nil
}

func parseOrderBy(props map[string]interface{}) (*ast.SelectStatement, error) {
n := &graph.Orderby{}
err := cast.MapToStruct(props, n)
if err != nil {
return nil, err
}
stmt := "SELECT * FROM unknown ORDER BY"
for _, s := range n.Sorts {
stmt += " " + s.Field + " "
if s.Desc {
stmt += "DESC"
}
}
p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
if err != nil {
return nil, fmt.Errorf("invalid order by statement error: %v", err)
} else {
return p, nil
}
}

func parseGroupBy(props map[string]interface{}) (*ast.SelectStatement, error) {
n := &graph.Groupby{}
err := cast.MapToStruct(props, n)
if err != nil {
return nil, err
}
if len(n.Dimensions) == 0 {
return nil, fmt.Errorf("groupby must have at least one dimension")
}
stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",")
p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
if err != nil {
return nil, fmt.Errorf("invalid join statement error: %v", err)
} else {
return p, nil
}
}

func parseJoin(props map[string]interface{}) (*ast.SelectStatement, error) {
n := &graph.Join{}
err := cast.MapToStruct(props, n)
if err != nil {
return nil, err
}
stmt := "SELECT * FROM " + n.From
for _, join := range n.Joins {
stmt += " " + join.Type + " JOIN ON " + join.On
}
p, err := xsql.NewParser(strings.NewReader(stmt)).Parse()
if err != nil {
return nil, fmt.Errorf("invalid join statement error: %v", err)
} else {
return p, nil
}
}
Loading

0 comments on commit e82fe62

Please sign in to comment.