diff --git a/.gitignore b/.gitignore index c5e82d74..387675ab 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -bin \ No newline at end of file +bin +.idea +var +go.sum \ No newline at end of file diff --git a/README.md b/README.md index 05a4878d..504723f4 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,22 @@ filter = ["id", "name"] In the above example, we will only sync MySQL table tfiler's columns `id` and `name` to Elasticsearch. +## Filter mysql ddl commands +You can use `shield_ddl` to skip mysql DDL command + +``` +[[rule]] +schema = "choujiang" +table = "c_21" +index = "" +type = "" +# shield ddl mysql command +# example skip one command : delete +# example skip many command : delete,update +shield_ddl = "delete" +``` +In the above example , we will skip mysql delete command + ## Ignore table without a primary key When you sync table without a primary key, you can see below error message. ``` diff --git a/etc/river.toml b/etc/river.toml index c390c862..5bc30d9d 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -14,15 +14,15 @@ es_user = "" es_pass = "" # Path to store data, like master.info, if not set or empty, -# we must use this to support breakpoint resume syncing. -# TODO: support other storage, like etcd. +# we must use this to support breakpoint resume syncing. +# TODO: support other storage, like etcd. data_dir = "./var" # Inner Http status address stat_addr = "127.0.0.1:12800" stat_path = "/metrics" -# pseudo server id like a slave +# pseudo server id like a slave server_id = 1001 # mysql or mariadb @@ -57,7 +57,7 @@ tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] # Below is for special rule mapping # Very simple example -# +# # desc t; # +-------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | @@ -65,15 +65,16 @@ tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] # | id | int(11) | NO | PRI | NULL | | # | name | varchar(256) | YES | | NULL | | # +-------+--------------+------+-----+---------+-------+ -# +# # The table `t` will be synced to ES index `test` and type `t`. [[rule]] schema = "test" table = "t" index = "test" type = "t" +shield_ddl = "" -# Wildcard table rule, the wildcard table must be in source tables +# Wildcard table rule, the wildcard table must be in source tables # All tables which match the wildcard format will be synced to ES index `test` and type `t`. # In this example, all tables must have same schema with above table `t`; [[rule]] @@ -82,7 +83,7 @@ table = "t_[0-9]{4}" index = "test" type = "t" -# Simple field rule +# Simple field rule # # desc tfield; # +----------+--------------+------+-----+---------+-------+ @@ -102,12 +103,12 @@ type = "tfield" [rule.field] # Map column `id` to ES field `es_id` id="es_id" -# Map column `tags` to ES field `es_tags` with array type +# Map column `tags` to ES field `es_tags` with array type tags="es_tags,list" # Map column `keywords` to ES with array type keywords=",list" -# Filter rule +# Filter rule # # desc tfilter; # +-------+--------------+------+-----+---------+-------+ @@ -145,5 +146,5 @@ table = "tid_[0-9]{4}" index = "test" type = "t" # The es doc's id will be `id`:`tag` -# It is useful for merge muliple table into one type while theses tables have same PK +# It is useful for merge muliple table into one type while theses tables have same PK id = ["id", "tag"] diff --git a/go.sum b/go.sum deleted file mode 100644 index 97c0717c..00000000 --- a/go.sum +++ /dev/null @@ -1,18 +0,0 @@ -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 h1:dMIPRDg6gi7CUp0Kj2+HxqJ5kTr1iAdzsXYIrLCNSmU= -github.com/juju/errors v0.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= -github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4= -github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= -github.com/siddontang/go-mysql v0.0.0-20190123011128-88e9cd7f6643 h1:yzg8+Cip1iDhy6GGS1zKflqOybgRc4xp82eYwQrP+DU= -github.com/siddontang/go-mysql v0.0.0-20190123011128-88e9cd7f6643/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI= -github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5 h1:5Nr7spTeY+ziXzqk/9p+GLnvH4rIjp9BX+aRaYDbR44= -github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI= diff --git a/river/rule.go b/river/rule.go index fb7cccb7..8bd56af9 100644 --- a/river/rule.go +++ b/river/rule.go @@ -32,6 +32,9 @@ type Rule struct { // Elasticsearch pipeline // To pre-process documents before indexing Pipeline string `toml:"pipeline"` + + // shield ddl command + ShieldDDL string `toml:"shield_ddl"` } func newDefaultRule(schema string, table string) *Rule { @@ -83,3 +86,33 @@ func (r *Rule) CheckFilter(field string) bool { } return false } + +// CheckHasShieldCommands Check Has Shield commands +func (r *Rule) CheckHasShieldCommands() bool { + return len(r.ShieldDDL) > 0 +} + +// CheckSkipShieldCommand check continue command +func (r *Rule) CheckSkipShieldCommand(action string) bool { + if len(r.ShieldDDL) < 1 { + return false + } + // get command []string + shieldDDLs := r.getManyShieldCommands() + for _, v := range shieldDDLs { + // action == shieldDDL + if v == action { + return true + } + } + return false +} + +func (r *Rule) getManyShieldCommands() []string { + // not have shield DDL command + if len(r.ShieldDDL) < 0 { + return []string{} + } + // command example delete,update + return strings.Split(r.ShieldDDL, ",") +} diff --git a/river/sync.go b/river/sync.go index acbcc8b0..3359d5d5 100644 --- a/river/sync.go +++ b/river/sync.go @@ -70,6 +70,12 @@ func (h *eventHandler) OnRow(e *canal.RowsEvent) error { return nil } + // check continue command + if rule.CheckSkipShieldCommand(e.Action) { + log.Infof("rules have continue ddl command , command event is (%v) , rows (%v)", e.Action, e.Rows) + return nil + } + var reqs []*elastic.BulkRequest var err error switch e.Action {