Skip to content

Commit

Permalink
add workers feature
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Dec 6, 2016
1 parent ac86d41 commit 331db55
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ The following defaults are used for missing config values:
max-file-size -> 0
file-highlighting -> false
file-namespaces -> nil
worker -> nil
workers -> nil
verbose -> false

When `resume` is true, monstache writes the timestamp of mongodb operations it has successfully synced to elasticsearch
Expand Down Expand Up @@ -150,6 +152,10 @@ When `dropped-databases` is false monstache will not delete the mapped indexes i

When `dropped-collections` is false monstache will not delete the mapped index in elasticsearch if a mongodb collection is dropped

When `worker` is given monstache will enter multi-worker mode and will require you to also provide the config option `workers`. Use this mode to run
multiple monstache processes and distribute the work between them. In this mode monstache will ensure that each mongo document id always goes to the
same worker and none of the other workers. See the section [workers](#workers) for more information.

### Config Syntax ###

For information on the syntax of the mongodb URL see [Standard Connection String Format](https://docs.mongodb.com/v3.0/reference/connection-string/#standard-connection-string-format)
Expand Down Expand Up @@ -424,3 +430,21 @@ For elasticsearch version 5 and above...
}
}]

<a name="workers"></a>
### Multiple Workers ###

You can run multiple monstache processes and distribute the work between them. First configure
the names of all the workers in a shared config.toml file.

workers = ["Tom", "Dick", "Harry"]

In this case we have 3 workers. Now we can start 3 monstache processes and give each one of the worker
names.

monstache -f config.toml -worker Tom
monstache -f config.toml -worker Dick
monstache -f config.toml -worker Harry

monstache will hash the id of each document using consistent hashing so that each id is handled by only
one of the available workers.

17 changes: 17 additions & 0 deletions monstache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/robertkrimen/otto"
_ "github.com/robertkrimen/otto/underscore"
"github.com/rwynn/gtm"
"github.com/rwynn/gtm/consistent"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"io"
Expand Down Expand Up @@ -89,6 +90,8 @@ type configOptions struct {
Script []javascript
Mapping []indexTypeMapping
FileNamespaces []string `toml:"file-namespaces"`
Workers []string
Worker string
}

func TestElasticSearchConn(conn *elastigo.Conn, configuration *configOptions) (err error) {
Expand Down Expand Up @@ -398,6 +401,7 @@ func (configuration *configOptions) ParseCommandLineFlags() *configOptions {
flag.BoolVar(&configuration.IndexFiles, "index-files", false, "True to index gridfs files into elasticsearch. Requires the elasticsearch mapper-attachments (deprecated) or ingest-attachment plugin")
flag.BoolVar(&configuration.FileHighlighting, "file-highlighting", false, "True to enable the ability to highlight search times for a file query")
flag.StringVar(&configuration.ResumeName, "resume-name", "", "Name under which to load/store the resume state. Defaults to 'default'")
flag.StringVar(&configuration.Worker, "worker", "", "The name of this worker in a multi-worker configuration")
flag.StringVar(&configuration.NsRegex, "namespace-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which match are synched to elasticsearch")
flag.StringVar(&configuration.NsRegex, "namespace-exclude-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which do not match are synched to elasticsearch")
flag.Parse()
Expand Down Expand Up @@ -530,6 +534,10 @@ func (configuration *configOptions) LoadConfigFile() *configOptions {
configuration.FileNamespaces = tomlConfig.FileNamespaces
tomlConfig.LoadGridFsConfig()
}
if configuration.Worker == "" {
configuration.Worker = tomlConfig.Worker
}
configuration.Workers = tomlConfig.Workers
tomlConfig.LoadScripts()
tomlConfig.LoadIndexTypes()
}
Expand Down Expand Up @@ -782,6 +790,15 @@ func main() {
if configuration.NsExcludeRegex != "" {
filterChain = append(filterChain, FilterInverseWithRegex(configuration.NsExcludeRegex))
}
if configuration.Worker != "" {
workerFilter, err := consistent.ConsistentHashFilter(configuration.Worker, configuration.Workers)
if err != nil {
panic(err)
}
filterChain = append(filterChain, workerFilter)
} else if configuration.Workers != nil {
panic("workers configured but this worker is undefined. worker must be set to one of the workers.")
}
filter = gtm.ChainOpFilters(filterChain...)

ops, errs := gtm.Tail(mongo, &gtm.Options{
Expand Down

0 comments on commit 331db55

Please sign in to comment.