Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shield mysql ddl command #401

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
bin
bin
.idea
var
go.sum
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,22 @@ filter = ["id", "name"]

In the above example, we will only sync MySQL table tfiler's columns `id` and `name` to Elasticsearch.

## Filter mysql ddl commands
You can use `shield_ddl` to skip mysql DDL command

```
[[rule]]
schema = "choujiang"
table = "c_21"
index = ""
type = ""
# shield ddl mysql command
# example skip one command : delete
# example skip many command : delete,update
shield_ddl = "delete"
```
In the above example , we will skip mysql delete command

## Ignore table without a primary key
When you sync table without a primary key, you can see below error message.
```
Expand Down
21 changes: 11 additions & 10 deletions etc/river.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ es_user = ""
es_pass = ""

# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
data_dir = "./var"

# Inner Http status address
stat_addr = "127.0.0.1:12800"
stat_path = "/metrics"

# pseudo server id like a slave
# pseudo server id like a slave
server_id = 1001

# mysql or mariadb
Expand Down Expand Up @@ -57,23 +57,24 @@ tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
# Below is for special rule mapping

# Very simple example
#
#
# desc t;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id | int(11) | NO | PRI | NULL | |
# | name | varchar(256) | YES | | NULL | |
# +-------+--------------+------+-----+---------+-------+
#
#
# The table `t` will be synced to ES index `test` and type `t`.
[[rule]]
schema = "test"
table = "t"
index = "test"
type = "t"
shield_ddl = ""

# Wildcard table rule, the wildcard table must be in source tables
# Wildcard table rule, the wildcard table must be in source tables
# All tables which match the wildcard format will be synced to ES index `test` and type `t`.
# In this example, all tables must have same schema with above table `t`;
[[rule]]
Expand All @@ -82,7 +83,7 @@ table = "t_[0-9]{4}"
index = "test"
type = "t"

# Simple field rule
# Simple field rule
#
# desc tfield;
# +----------+--------------+------+-----+---------+-------+
Expand All @@ -102,12 +103,12 @@ type = "tfield"
[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type
# Map column `tags` to ES field `es_tags` with array type
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

# Filter rule
# Filter rule
#
# desc tfilter;
# +-------+--------------+------+-----+---------+-------+
Expand Down Expand Up @@ -145,5 +146,5 @@ table = "tid_[0-9]{4}"
index = "test"
type = "t"
# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK
# It is useful for merge muliple table into one type while theses tables have same PK
id = ["id", "tag"]
18 changes: 0 additions & 18 deletions go.sum

This file was deleted.

33 changes: 33 additions & 0 deletions river/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type Rule struct {
// Elasticsearch pipeline
// To pre-process documents before indexing
Pipeline string `toml:"pipeline"`

// shield ddl command
ShieldDDL string `toml:"shield_ddl"`
}

func newDefaultRule(schema string, table string) *Rule {
Expand Down Expand Up @@ -83,3 +86,33 @@ func (r *Rule) CheckFilter(field string) bool {
}
return false
}

// CheckHasShieldCommands Check Has Shield commands
func (r *Rule) CheckHasShieldCommands() bool {
return len(r.ShieldDDL) > 0
}

// CheckSkipShieldCommand check continue command
func (r *Rule) CheckSkipShieldCommand(action string) bool {
if len(r.ShieldDDL) < 1 {
return false
}
// get command []string
shieldDDLs := r.getManyShieldCommands()
for _, v := range shieldDDLs {
// action == shieldDDL
if v == action {
return true
}
}
return false
}

func (r *Rule) getManyShieldCommands() []string {
// not have shield DDL command
if len(r.ShieldDDL) < 0 {
return []string{}
}
// command example delete,update
return strings.Split(r.ShieldDDL, ",")
}
6 changes: 6 additions & 0 deletions river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
return nil
}

// check continue command
if rule.CheckSkipShieldCommand(e.Action) {
log.Infof("rules have continue ddl command , command event is (%v) , rows (%v)", e.Action, e.Rows)
return nil
}

var reqs []*elastic.BulkRequest
var err error
switch e.Action {
Expand Down