From 6ccf33a0eceedbff6de276c3636933aafeb5c217 Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Sat, 7 Dec 2024 10:50:50 -0500 Subject: [PATCH] Various updates and improvements. --- go.mod | 5 +- go.sum | 80 +++++++++++++++------- monstache.go | 183 ++++++++++++++++++++++++--------------------------- 3 files changed, 146 insertions(+), 122 deletions(-) diff --git a/go.mod b/go.mod index 1b15a76..9dedb0f 100644 --- a/go.mod +++ b/go.mod @@ -6,12 +6,11 @@ require ( github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf github.com/evanphx/json-patch v5.6.0+incompatible github.com/fsnotify/fsnotify v1.5.1 - github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/olivere/elastic/v7 v7.0.31 github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f - github.com/rwynn/gtm/v2 v2.1.3 - go.mongodb.org/mongo-driver v1.13.1 + github.com/rwynn/gtm/v2 v2.1.4 + go.mongodb.org/mongo-driver v1.17.1 gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) diff --git a/go.sum b/go.sum index abc69d0..e567e73 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -47,10 +45,10 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -63,8 +61,8 @@ github.com/klauspost/compress v1.15.1 h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583 github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= -github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/olivere/elastic/v7 v7.0.31 h1:VJu9/zIsbeiulwlRCfGQf6Tzsr++uo+FeUgj5oj+xKk= github.com/olivere/elastic/v7 v7.0.31/go.mod h1:idEQxe7Es+Wr4XAuNnJdKeMZufkA9vQprOIFck061vg= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -75,8 +73,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f h1:a7clxaGmmqtdNTXyvrp/lVO/Gnkzlhc/+dLs5v965GM= github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f/go.mod h1:/mK7FZ3mFYEn9zvNPhpngTyatyehSwte5bJZ4ehL5Xw= -github.com/rwynn/gtm/v2 v2.1.3 h1:bbArJLs9rTQf59resLGl+fmfTWUkMvT/k9YHkjU0NGQ= -github.com/rwynn/gtm/v2 v2.1.3/go.mod h1:60y/nHkg4dTTcyixFxwgcDRU4XG5iwcK7sg9PmiJ+Hg= +github.com/rwynn/gtm/v2 v2.1.4 h1:jpQeOwt0GKT5XLxe4j6fMCtXmdz1+RJnNQROLrrmLVg= +github.com/rwynn/gtm/v2 v2.1.4/go.mod h1:wuZxyBzQssI70559SuCwAX4jBOfaWuUCzwTdlfMb8AA= github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b h1:h+3JX2VoWTFuyQEo87pStk/a99dzIO1mM9KxIyLPGTU= github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc= github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= @@ -93,25 +91,30 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= -github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= -github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= -github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.mongodb.org/mongo-driver v1.12.1/go.mod h1:/rGBTebI3XYboVmgz+Wv3Bcbl3aD0QF9zl6kDDw18rQ= -go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/vk= -go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -120,17 +123,26 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -141,19 +153,39 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -161,8 +193,10 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/monstache.go b/monstache.go index 33e5870..575bcb3 100644 --- a/monstache.go +++ b/monstache.go @@ -11,7 +11,7 @@ import ( "errors" "flag" "fmt" - "io/ioutil" + "io" "log" "math" "net/http" @@ -83,8 +83,8 @@ var tmNamespaces = make(map[string]bool) var routingNamespaces = make(map[string]bool) var mux sync.Mutex -var chunksRegex = regexp.MustCompile("\\.chunks$") -var systemsRegex = regexp.MustCompile("system\\..+$") +var chunksRegex = regexp.MustCompile(`\.chunks$`) +var systemsRegex = regexp.MustCompile(`system\..+$`) var exitStatus = 0 const version = "6.7.17" @@ -140,6 +140,7 @@ type indexClient struct { gtmCtx *gtm.OpCtxMulti config *configOptions mongo *mongo.Client + mongoWriter *mongo.Client mongoConfig *mongo.Client bulk *elastic.BulkProcessor bulkStats *elastic.BulkProcessor @@ -319,6 +320,7 @@ type configOptions struct { EnableTemplate bool EnvDelimiter string MongoURL string `toml:"mongo-url"` + MongoWriteURL string `toml:"mongo-write-url"` MongoConfigURL string `toml:"mongo-config-url"` MongoOpLogDatabaseName string `toml:"mongo-oplog-database-name"` MongoOpLogCollectionName string `toml:"mongo-oplog-collection-name"` @@ -1058,7 +1060,7 @@ func (ic *indexClient) mapDataGolang(op *gtm.Op) error { } else { if output.Skip { op.Data = map[string]interface{}{} - } else if output.Passthrough == false { + } else if !output.Passthrough { if output.Document == nil { return errors.New("Map function must return a non-nil document") } @@ -1246,7 +1248,7 @@ func (ic *indexClient) processRelated(root *gtm.Op) (err error) { Data: op.Data, } ic.doDelete(rop) - q = append(q, rop) + q = append(q, cloneOp(rop)) continue } var srcData interface{} @@ -1304,24 +1306,7 @@ func (ic *indexClient) processRelated(root *gtm.Op) (err error) { continue } if processPlugin != nil { - pop := >m.Op{ - Id: rop.Id, - Operation: rop.Operation, - Namespace: rop.Namespace, - Source: rop.Source, - Timestamp: rop.Timestamp, - UpdateDescription: rop.UpdateDescription, - } - var data []byte - data, err = bson.Marshal(rop.Data) - if err == nil { - var m map[string]interface{} - err = bson.Unmarshal(data, &m) - if err == nil { - pop.Data = m - } - } - ic.processC <- pop + ic.processC <- cloneOp(rop) } skip := false if rs2 := relates[rop.Namespace]; len(rs2) != 0 { @@ -1336,7 +1321,7 @@ func (ic *indexClient) processRelated(root *gtm.Op) (err error) { } } if visit { - q = append(q, rop) + q = append(q, cloneOp(rop)) } } if !skip { @@ -1357,6 +1342,23 @@ func (ic *indexClient) processRelated(root *gtm.Op) (err error) { return } +func cloneOp(op *gtm.Op) *gtm.Op { + if op == nil { + return nil + } + clone := *op + var data []byte + data, err := bson.Marshal(op.Data) + if err == nil { + var m map[string]interface{} + err = bson.Unmarshal(data, &m) + if err == nil { + clone.Data = m + } + } + return &clone +} + func (ic *indexClient) prepareDataForIndexing(op *gtm.Op) { config := ic.config data := op.Data @@ -1557,7 +1559,7 @@ func (ic *indexClient) ensureClusterTTL() error { Keys: bson.M{"expireAt": 1}, Options: io, } - col := ic.mongo.Database(ic.config.ConfigDatabaseName).Collection("cluster") + col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("cluster") iv := col.Indexes() _, err := iv.CreateOne(context.Background(), im) return err @@ -1566,7 +1568,7 @@ func (ic *indexClient) ensureClusterTTL() error { func (ic *indexClient) enableProcess() (bool, error) { var err error var host string - col := ic.mongo.Database(ic.config.ConfigDatabaseName).Collection("cluster") + col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("cluster") findOneOpts := options.FindOne().SetProjection(bson.M{"_id": 1}) sr := col.FindOne(context.Background(), bson.M{"_id": ic.config.ResumeName}, findOneOpts) err = sr.Err() @@ -1598,7 +1600,7 @@ func (ic *indexClient) enableProcess() (bool, error) { } func (ic *indexClient) resetClusterState() error { - col := ic.mongo.Database(ic.config.ConfigDatabaseName).Collection("cluster") + col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("cluster") _, err := col.DeleteOne(context.Background(), bson.M{"_id": ic.config.ResumeName}) return err } @@ -1660,7 +1662,7 @@ func (ic *indexClient) saveTokens() error { if len(ic.tokens) == 0 { return err } - col := ic.mongo.Database(ic.config.ConfigDatabaseName).Collection("tokens") + col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("tokens") bwo := options.BulkWrite().SetOrdered(false) var models []mongo.WriteModel for streamID, token := range ic.tokens { @@ -1687,7 +1689,7 @@ func (ic *indexClient) saveTokens() error { } func (ic *indexClient) saveTimestamp() error { - col := ic.mongo.Database(ic.config.ConfigDatabaseName).Collection("monstache") + col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("monstache") doc := map[string]interface{}{ "ts": ic.lastTs, } @@ -1743,7 +1745,7 @@ func (ic *indexClient) filterDirectReadNamespaces(wanted []string) (results []st } func (ic *indexClient) saveDirectReadNamespaces() (err error) { - col := ic.mongo.Database(ic.config.ConfigDatabaseName).Collection("directreads") + col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("directreads") filter := bson.M{ "_id": ic.config.ResumeName, } @@ -1764,6 +1766,7 @@ func (config *configOptions) parseCommandLineFlags() *configOptions { flag.StringVar(&config.EnvDelimiter, "env-delimiter", ",", "A delimiter to use when splitting environment variable values") flag.StringVar(&config.MongoURL, "mongo-url", "", "MongoDB server or router server connection URL") flag.StringVar(&config.MongoConfigURL, "mongo-config-url", "", "MongoDB config server connection URL") + flag.StringVar(&config.MongoWriteURL, "mongo-write-url", "", "MongoDB connection URL for writing monstache state") flag.StringVar(&config.MongoOpLogDatabaseName, "mongo-oplog-database-name", "", "Override the database name which contains the mongodb oplog") flag.StringVar(&config.MongoOpLogCollectionName, "mongo-oplog-collection-name", "", "Override the collection name which contains the mongodb oplog") flag.StringVar(&config.GraylogAddr, "graylog-addr", "", "Send logs to a Graylog server at this address") @@ -1914,7 +1917,7 @@ func (config *configOptions) loadPipelines() { errorLog.Fatalln("Pipelines must specify path or script but not both") } if s.Path != "" { - if script, err := ioutil.ReadFile(s.Path); err == nil { + if script, err := os.ReadFile(s.Path); err == nil { s.Script = string(script[:]) } else { errorLog.Fatalf("Unable to load pipeline at path %s: %s", s.Path, err) @@ -1954,7 +1957,7 @@ func (config *configOptions) loadFilters() { errorLog.Fatalln("Filters must specify path or script but not both") } if s.Path != "" { - if script, err := ioutil.ReadFile(s.Path); err == nil { + if script, err := os.ReadFile(s.Path); err == nil { s.Script = string(script[:]) } else { errorLog.Fatalf("Unable to load filter at path %s: %s", s.Path, err) @@ -2000,7 +2003,7 @@ func jsStringFromBinData(call otto.FunctionCall) otto.Value { errorLog.Println("error could not convert bindata to type primitve.Binary") return otto.NullValue() } - s, _ := otto.ToValue(monstachemap.EncodeBinData(monstachemap.Binary{binData})) + s, _ := otto.ToValue(monstachemap.EncodeBinData(monstachemap.Binary{Binary: binData})) return s } @@ -2011,7 +2014,7 @@ func (config *configOptions) loadScripts() { errorLog.Fatalln("Scripts must specify path or script but not both") } if s.Path != "" { - if script, err := ioutil.ReadFile(s.Path); err == nil { + if script, err := os.ReadFile(s.Path); err == nil { s.Script = string(script[:]) } else { errorLog.Fatalf("Unable to load script at path %s: %s", s.Path, err) @@ -2058,9 +2061,9 @@ func (config *configOptions) loadPlugins() *configOptions { mapper, err := p.Lookup("Map") if err == nil { funcDefined = true - switch mapper.(type) { + switch mt := mapper.(type) { case func(*monstachemap.MapperPluginInput) (*monstachemap.MapperPluginOutput, error): - mapperPlugin = mapper.(func(*monstachemap.MapperPluginInput) (*monstachemap.MapperPluginOutput, error)) + mapperPlugin = mt default: errorLog.Fatalf("Plugin 'Map' function must be typed %T", mapperPlugin) } @@ -2068,9 +2071,9 @@ func (config *configOptions) loadPlugins() *configOptions { filter, err := p.Lookup("Filter") if err == nil { funcDefined = true - switch filter.(type) { + switch ft := filter.(type) { case func(*monstachemap.MapperPluginInput) (bool, error): - filterPlugin = filter.(func(*monstachemap.MapperPluginInput) (bool, error)) + filterPlugin = ft default: errorLog.Fatalf("Plugin 'Filter' function must be typed %T", filterPlugin) } @@ -2079,9 +2082,9 @@ func (config *configOptions) loadPlugins() *configOptions { process, err := p.Lookup("Process") if err == nil { funcDefined = true - switch process.(type) { + switch pt := process.(type) { case func(*monstachemap.ProcessPluginInput) error: - processPlugin = process.(func(*monstachemap.ProcessPluginInput) error) + processPlugin = pt default: errorLog.Fatalf("Plugin 'Process' function must be typed %T", processPlugin) } @@ -2089,9 +2092,9 @@ func (config *configOptions) loadPlugins() *configOptions { pipe, err := p.Lookup("Pipeline") if err == nil { funcDefined = true - switch pipe.(type) { + switch pt := pipe.(type) { case func(string, bool) ([]interface{}, error): - pipePlugin = pipe.(func(string, bool) ([]interface{}, error)) + pipePlugin = pt default: errorLog.Fatalf("Plugin 'Pipeline' function must be typed %T", pipePlugin) } @@ -2113,7 +2116,7 @@ func (config *configOptions) decodeAsTemplate() *configOptions { name, val := pair[0], pair[1] env[name] = val } - tpl, err := ioutil.ReadFile(config.ConfigFile) + tpl, err := os.ReadFile(config.ConfigFile) if err != nil { errorLog.Fatalln(err) } @@ -2156,6 +2159,9 @@ func (config *configOptions) loadConfigFile() *configOptions { if config.MongoConfigURL == "" { config.MongoConfigURL = tomlConfig.MongoConfigURL } + if config.MongoWriteURL == "" { + config.MongoWriteURL = tomlConfig.MongoWriteURL + } if config.MongoOpLogDatabaseName == "" { config.MongoOpLogDatabaseName = tomlConfig.MongoOpLogDatabaseName } @@ -2527,178 +2533,146 @@ func (config *configOptions) loadEnvironment() *configOptions { if config.MongoURL == "" { config.MongoURL = val } - break case "MONSTACHE_MONGO_CONFIG_URL": if config.MongoConfigURL == "" { config.MongoConfigURL = val } - break + case "MONSTACHE_MONGO_WRITE_URL": + if config.MongoWriteURL == "" { + config.MongoWriteURL = val + } case "MONSTACHE_MONGO_OPLOG_DB": if config.MongoOpLogDatabaseName == "" { config.MongoOpLogDatabaseName = val } - break case "MONSTACHE_MONGO_OPLOG_COL": if config.MongoOpLogCollectionName == "" { config.MongoOpLogCollectionName = val } - break case "MONSTACHE_ES_URLS": if len(config.ElasticUrls) == 0 { config.ElasticUrls = strings.Split(val, del) } - break case "MONSTACHE_ES_USER": if config.ElasticUser == "" { config.ElasticUser = val } - break case "MONSTACHE_ES_PASS": if config.ElasticPassword == "" { config.ElasticPassword = val } - break case "MONSTACHE_ES_PEM": if config.ElasticPemFile == "" { config.ElasticPemFile = val } - break case "MONSTACHE_ES_PKI_CERT": if config.ElasticPKIAuth.CertFile == "" { config.ElasticPKIAuth.CertFile = val } - break case "MONSTACHE_ES_PKI_KEY": if config.ElasticPKIAuth.KeyFile == "" { config.ElasticPKIAuth.KeyFile = val } - break case "MONSTACHE_ES_API_KEY": if config.ElasticAPIKey == "" { config.ElasticAPIKey = val } - break case "MONSTACHE_ES_VALIDATE_PEM": v, err := strconv.ParseBool(val) if err != nil { errorLog.Fatalf("Failed to load MONSTACHE_ES_VALIDATE_PEM: %s", err) } config.ElasticValidatePemFile = v - break case "MONSTACHE_WORKER": if config.Worker == "" { config.Worker = val } - break case "MONSTACHE_CLUSTER": if config.ClusterName == "" { config.ClusterName = val } - break case "MONSTACHE_DIRECT_READ_NS": if len(config.DirectReadNs) == 0 { config.DirectReadNs = strings.Split(val, del) } - break case "MONSTACHE_CHANGE_STREAM_NS": if len(config.ChangeStreamNs) == 0 { config.ChangeStreamNs = strings.Split(val, del) } - break case "MONSTACHE_DIRECT_READ_NS_DYNAMIC_EXCLUDE_REGEX": if config.DirectReadExcludeRegex == "" { config.DirectReadExcludeRegex = val } - break case "MONSTACHE_DIRECT_READ_NS_DYNAMIC_INCLUDE_REGEX": if config.DirectReadIncludeRegex == "" { config.DirectReadIncludeRegex = val } - break case "MONSTACHE_NS_REGEX": if config.NsRegex == "" { config.NsRegex = val } - break case "MONSTACHE_NS_EXCLUDE_REGEX": if config.NsExcludeRegex == "" { config.NsExcludeRegex = val } - break case "MONSTACHE_NS_DROP_REGEX": if config.NsDropRegex == "" { config.NsDropRegex = val } - break case "MONSTACHE_NS_DROP_EXCLUDE_REGEX": if config.NsDropExcludeRegex == "" { config.NsDropExcludeRegex = val } - break case "MONSTACHE_GRAYLOG_ADDR": if config.GraylogAddr == "" { config.GraylogAddr = val } - break case "MONSTACHE_AWS_ACCESS_KEY": config.AWSConnect.AccessKey = val - break case "MONSTACHE_AWS_SECRET_KEY": config.AWSConnect.SecretKey = val - break case "MONSTACHE_AWS_REGION": config.AWSConnect.Region = val - break case "MONSTACHE_LOG_DIR": config.Logs.Info = val + "/info.log" config.Logs.Warn = val + "/warn.log" config.Logs.Error = val + "/error.log" config.Logs.Trace = val + "/trace.log" config.Logs.Stats = val + "/stats.log" - break case "MONSTACHE_LOG_MAX_SIZE": i, err := strconv.ParseInt(val, 10, 64) if err != nil { errorLog.Fatalf("Failed to load MONSTACHE_LOG_MAX_SIZE: %s", err) } config.LogRotate.MaxSize = int(i) - break case "MONSTACHE_LOG_MAX_BACKUPS": i, err := strconv.ParseInt(val, 10, 64) if err != nil { errorLog.Fatalf("Failed to load MONSTACHE_LOG_MAX_BACKUPS: %s", err) } config.LogRotate.MaxBackups = int(i) - break case "MONSTACHE_LOG_MAX_AGE": i, err := strconv.ParseInt(val, 10, 64) if err != nil { errorLog.Fatalf("Failed to load MONSTACHE_LOG_MAX_AGE: %s", err) } config.LogRotate.MaxAge = int(i) - break case "MONSTACHE_HTTP_ADDR": if config.HTTPServerAddr == "" { config.HTTPServerAddr = val } - break case "MONSTACHE_FILE_NS": if len(config.FileNamespaces) == 0 { config.FileNamespaces = strings.Split(val, del) } - break case "MONSTACHE_PATCH_NS": if len(config.PatchNamespaces) == 0 { config.PatchNamespaces = strings.Split(val, del) } - break case "MONSTACHE_TIME_MACHINE_NS": if len(config.TimeMachineNamespaces) == 0 { config.TimeMachineNamespaces = strings.Split(val, del) } - break - default: - continue } } return config @@ -2711,7 +2685,7 @@ func (config *configOptions) loadVariableValueFromFile(name string, path string) return name, "", fmt.Errorf("read value for %s from file failed: %s", name, err) } defer f.Close() - c, err := ioutil.ReadAll(f) + c, err := io.ReadAll(f) if err != nil { return name, "", fmt.Errorf("read value for %s from file failed: %s", name, err) } @@ -2961,7 +2935,7 @@ func (config *configOptions) NewHTTPClient() (client *http.Client, err error) { if config.ElasticPemFile != "" { var ca []byte certs := x509.NewCertPool() - if ca, err = ioutil.ReadFile(config.ElasticPemFile); err == nil { + if ca, err = os.ReadFile(config.ElasticPemFile); err == nil { if ok := certs.AppendCertsFromPEM(ca); !ok { errorLog.Printf("No certs parsed successfully from %s", config.ElasticPemFile) } @@ -3180,7 +3154,7 @@ func (ic *indexClient) doIndexing(op *gtm.Op) (err error) { if ic.hasFileContent(op) { ingestAttachment = op.Data["file"] != nil } - if ic.config.IndexAsUpdate && meta.Pipeline == "" && ingestAttachment == false { + if ic.config.IndexAsUpdate && meta.Pipeline == "" && !ingestAttachment { req := elastic.NewBulkUpdateRequest() req.UseEasyJSON(ic.config.EnableEasyJSON) req.Id(objectID) @@ -3263,7 +3237,7 @@ func (ic *indexClient) doIndexing(op *gtm.Op) (err error) { data[k] = v } data["_source_id"] = objectID - if ic.config.IndexOplogTime == false { + if !ic.config.IndexOplogTime { secs := int64(op.Timestamp.T) t := time.Unix(secs, 0).UTC() data[ic.config.OplogTsFieldName] = op.Timestamp @@ -3528,7 +3502,7 @@ func (ic *indexClient) doIndexStats() (err error) { func (ic *indexClient) dropDBMeta(db string) (err error) { if ic.config.DeleteStrategy == statefulDeleteStrategy { - col := ic.mongo.Database(ic.config.ConfigDatabaseName).Collection("meta") + col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("meta") q := bson.M{"db": db} _, err = col.DeleteMany(context.Background(), q) } @@ -3537,7 +3511,7 @@ func (ic *indexClient) dropDBMeta(db string) (err error) { func (ic *indexClient) dropCollectionMeta(namespace string) (err error) { if ic.config.DeleteStrategy == statefulDeleteStrategy { - col := ic.mongo.Database(ic.config.ConfigDatabaseName).Collection("meta") + col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("meta") q := bson.M{"namespace": namespace} _, err = col.DeleteMany(context.Background(), q) } @@ -3606,7 +3580,7 @@ func (meta *indexingMeta) shouldSave(config *configOptions) bool { func (ic *indexClient) setIndexMeta(namespace, id string, meta *indexingMeta) error { config := ic.config - col := ic.mongo.Database(config.ConfigDatabaseName).Collection("meta") + col := ic.mongoWriter.Database(config.ConfigDatabaseName).Collection("meta") metaID := fmt.Sprintf("%s.%s", namespace, id) doc := map[string]interface{}{ "id": meta.ID, @@ -4333,7 +4307,7 @@ func (config *configOptions) makeShardInsertHandler() gtm.ShardInsertHandler { } } -func buildPipe(config *configOptions) func(string, bool) ([]interface{}, error) { +func buildPipe() func(string, bool) ([]interface{}, error) { if pipePlugin != nil { return pipePlugin } else if len(pipeEnvs) > 0 { @@ -4356,16 +4330,14 @@ func buildPipe(config *configOptions) func(string, bool) ([]interface{}, error) } else if data == val { return nil, errors.New("Exported pipeline function must return an array") } else { - switch data.(type) { + switch ds := data.(type) { case []map[string]interface{}: - ds := data.([]map[string]interface{}) var is []interface{} = make([]interface{}, len(ds)) for i, d := range ds { is[i] = deepExportValue(d) } return is, nil case []interface{}: - ds := data.([]interface{}) if len(ds) > 0 { errorLog.Fatalln("Pipeline function must return an array of objects") } @@ -4452,6 +4424,7 @@ func (ic *indexClient) setupBulk() { } func (ic *indexClient) run() { + ic.logVersionInfo() ic.startNotify() ic.setupFileIndexing() ic.setupBulk() @@ -4874,7 +4847,7 @@ func (ic *indexClient) buildGtmOptions() *gtm.Options { Token: token, Filter: filter, NamespaceFilter: nsFilter, - OpLogDisabled: config.EnableOplog == false, + OpLogDisabled: !config.EnableOplog, OpLogDatabaseName: config.MongoOpLogDatabaseName, OpLogCollectionName: config.MongoOpLogCollectionName, ChannelSize: config.GtmSettings.ChannelSize, @@ -4888,7 +4861,7 @@ func (ic *indexClient) buildGtmOptions() *gtm.Options { DirectReadNoTimeout: config.DirectReadNoTimeout, DirectReadFilter: directReadFilter, Log: infoLog, - Pipe: buildPipe(config), + Pipe: buildPipe(), ChangeStreamNs: config.ChangeStreamNs, DirectReadBounded: config.DirectReadBounded, MaxAwaitTime: ic.parseMaxAwaitTime(), @@ -5020,7 +4993,7 @@ func (ic *indexClient) eventLoop() { var err error var allOpsVisited bool timestampTicker := time.NewTicker(10 * time.Second) - if ic.config.Resume == false { + if !ic.config.Resume { timestampTicker.Stop() } heartBeat := time.NewTicker(10 * time.Second) @@ -5032,7 +5005,7 @@ func (ic *indexClient) eventLoop() { statsTimeout, _ = time.ParseDuration(ic.config.StatsDuration) } printStats := time.NewTicker(statsTimeout) - if ic.config.Stats == false { + if !ic.config.Stats { printStats.Stop() } infoLog.Println("Listening for events") @@ -5315,16 +5288,32 @@ func buildMongoClient(config *configOptions) *mongo.Client { errorLog.Fatalf("Unable to connect to MongoDB using URL %s: %s", cleanMongoURL(config.MongoURL), err) } + return mongoClient +} + +func (ic *indexClient) logVersionInfo() { infoLog.Printf("Started monstache version %s", version) infoLog.Printf("Go version %s", runtime.Version()) infoLog.Printf("MongoDB go driver %s", mongoversion.Driver) infoLog.Printf("Elasticsearch go driver %s", elastic.Version) - if mongoInfo, err := getBuildInfo(mongoClient); err == nil { + if mongoInfo, err := getBuildInfo(ic.mongo); err == nil { infoLog.Printf("Successfully connected to MongoDB version %s", mongoInfo.Version) - validateFeatures(config, mongoInfo) + validateFeatures(ic.config, mongoInfo) } else { infoLog.Println("Successfully connected to MongoDB") } +} + +func buildMongoWriterClient(config *configOptions, mongoReaderClient *mongo.Client) *mongo.Client { + if config.MongoWriteURL == "" { + return mongoReaderClient + + } + mongoClient, err := config.dialMongo(config.MongoWriteURL) + if err != nil { + errorLog.Fatalf("Unable to connect to MongoDB using write URL %s: %s", + cleanMongoURL(config.MongoWriteURL), err) + } return mongoClient } @@ -5355,6 +5344,7 @@ func main() { sh.start() mongoClient := buildMongoClient(config) + mongoWriterClient := buildMongoWriterClient(config, mongoClient) loadBuiltinFunctions(mongoClient, config) elasticClient := buildElasticClient(config) @@ -5362,6 +5352,7 @@ func main() { ic := &indexClient{ config: config, mongo: mongoClient, + mongoWriter: mongoWriterClient, client: elasticClient, fileWg: &sync.WaitGroup{}, indexWg: &sync.WaitGroup{},