Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(api): add refiller index goroutine for search index #907

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func main() {
databaseSettings := applicationConfig.Redis.GetSettings()
notificationHistorySettings := applicationConfig.NotificationHistory.GetSettings()
database := redis.NewDatabase(logger, databaseSettings, notificationHistorySettings, redis.API)
database.Flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ой


// Start Index right before HTTP listener. Fail if index cannot start
searchIndex := index.NewSearchIndex(logger, database, telemetry.Metrics)
Expand Down
11 changes: 11 additions & 0 deletions database/redis/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,17 @@ func (connector *DbConnector) getTriggersIdsByTags(tags []string) ([]string, err
return values, nil
}

func (connector *DbConnector) DeleteAllSubscriptions() {
subs := connector.Client().Keys(context.Background(), subscriptionKey("*"))
connector.Client().Del(context.Background(), subs.Val()...)

subs = connector.Client().Keys(context.Background(), userSubscriptionsKey("*"))
connector.Client().Del(context.Background(), subs.Val()...)

subs = connector.Client().Keys(context.Background(), teamSubscriptionsKey("*"))
connector.Client().Del(context.Background(), subs.Val()...)
}

func (connector *DbConnector) getSubscriptionTriggers(subscription *moira.SubscriptionData) ([]*moira.Trigger, error) {
if subscription == nil {
return make([]*moira.Trigger, 0), nil
Expand Down
18 changes: 12 additions & 6 deletions index/actualizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const actualizerRunInterval = time.Second

func (index *Index) runIndexActualizer() error {
ticker := time.NewTicker(actualizerRunInterval)
defer ticker.Stop()
index.logger.Info().
Interface("actualizer_interval", actualizerRunInterval).
Msg("Start index actualizer: reindex changed triggers in loop with given interval")
Expand All @@ -20,6 +21,11 @@ func (index *Index) runIndexActualizer() error {
index.logger.Info().Msg("Stop index actualizer")
return nil
case <-ticker.C:
if !index.checkIfIndexIsReady() {
index.logger.Warning().
Msg("Cannot actualize triggers cause index is not ready")
continue
}
newTime := time.Now().Unix()
if float64(newTime-index.indexActualizedTS) > sweeperTimeToKeep.Seconds() {
index.logger.Error().
Expand Down Expand Up @@ -76,19 +82,19 @@ func (index *Index) actualizeIndex() error {
}

if len(triggersToDelete) > 0 {
err2 := index.triggerIndex.Delete(triggersToDelete)
if err2 != nil {
return err2
err = index.triggerIndex.Delete(triggersToDelete)
if err != nil {
return err
}
log.Debug().
Int("triggers_count", len(triggersToDelete)).
Msg("Some triggers deleted")
}

if len(triggersToUpdate) > 0 {
err2 := index.triggerIndex.Write(triggersToUpdate)
if err2 != nil {
return err2
err = index.triggerIndex.Write(triggersToUpdate)
if err != nil {
return err
}
log.Debug().
Int("triggers_count", len(triggersToUpdate)).
Expand Down
70 changes: 65 additions & 5 deletions index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,67 @@ import (
func (index *Index) writeByBatches(triggerIDs []string, batchSize int) error {
triggerIDsBatches := moira.ChunkSlice(triggerIDs, batchSize)
triggerChecksChan, errorsChan := index.getTriggerChecksBatches(triggerIDsBatches)
return index.handleTriggerBatches(triggerChecksChan, errorsChan, len(triggerIDs))
return index.writeTriggerBatches(triggerChecksChan, errorsChan, len(triggerIDs))
}

func (index *Index) deleteByBatches(triggerIDs []string, batchSize int) error {
triggerIDsBatches := moira.ChunkSlice(triggerIDs, batchSize)
triggerIDsChan := getChannelWithTriggerIDsBatches(triggerIDsBatches)
return index.deleteTriggerBatches(triggerIDsChan, len(triggerIDs))
}

func getChannelWithTriggerIDsBatches(triggerIDsBatches [][]string) <-chan []string {
ch := make(chan []string, len(triggerIDsBatches))

go func() {
defer close(ch)

for _, triggerIDsBatch := range triggerIDsBatches {
ch <- triggerIDsBatch
}
}()

return ch
}

func (index *Index) deleteTriggerBatches(triggerIDsChan <-chan []string, triggersTotal int) error {
indexErrors := make(chan error)
wg := &sync.WaitGroup{}
defer wg.Wait()
var count int64

for {
select {
case batch, ok := <-triggerIDsChan:
if !ok {
return nil
}

wg.Add(1)
go func(b []string) {
defer wg.Done()
err := index.triggerIndex.Delete(b)
atomic.AddInt64(&count, int64(len(b)))
if err != nil {
indexErrors <- err
return
}
index.logger.Debug().
Int("batch_size", len(batch)).
Int64("count", atomic.LoadInt64(&count)).
Int("triggers_total", triggersTotal).
Msg("Batch of triggers deleted from index")
}(batch)

case err, ok := <-indexErrors:
if ok {
index.logger.Error().
Error(err).
Msg("Cannot index trigger checks")
}
return err
}
}
}

func (index *Index) getTriggerChecksBatches(triggerIDsBatches [][]string) (triggerChecksChan chan []*moira.TriggerCheck, errors chan error) {
Expand Down Expand Up @@ -61,7 +121,7 @@ func (index *Index) getTriggerChecksWithRetries(batch []string) ([]*moira.Trigge
return nil, fmt.Errorf("cannot get trigger checks from DB after %d tries, last error: %s", triesCount, err.Error())
}

func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, triggersTotal int) error {
func (index *Index) writeTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, triggersTotal int) error {
indexErrors := make(chan error)
wg := &sync.WaitGroup{}
defer wg.Wait()
Expand All @@ -76,10 +136,10 @@ func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.Trigger
wg.Add(1)
go func(b []*moira.TriggerCheck) {
defer wg.Done()
err2 := index.triggerIndex.Write(b)
err := index.triggerIndex.Write(b)
atomic.AddInt64(&count, int64(len(b)))
if err2 != nil {
indexErrors <- err2
if err != nil {
indexErrors <- err
return
}
index.logger.Debug().
Expand Down
5 changes: 5 additions & 0 deletions index/bleve/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ func TestTriggerIndex_Delete(t *testing.T) {
So(err, ShouldBeNil)
})
})

Convey("Test close index", t, func() {
err = newIndex.Close()
So(err, ShouldBeNil)
})
}
2 changes: 1 addition & 1 deletion index/bleve/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func buildQueryForOnlyErrors(onlyErrors bool) (searchQueries []query.Query) {
if !onlyErrors {
return
}
minScore := float64(1)
minScore := 1.0
qr := bleve.NewNumericRangeQuery(&minScore, nil)
qr.FieldVal = mapping.TriggerLastCheckScore.GetName()
return append(searchQueries, qr)
Expand Down
12 changes: 8 additions & 4 deletions index/bleve/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import (
func (index *TriggerIndex) Search(filterTags []string, searchString string, onlyErrors bool, page int64, size int64) (searchResults []*moira.SearchResult, total int64, err error) {
if size < 0 {
page = 0
docs, _ := index.index.DocCount()
var docs uint64
docs, err = index.index.DocCount()
if err != nil {
return
}
size = int64(docs)
}

Expand Down Expand Up @@ -48,14 +52,14 @@ func (index *TriggerIndex) Search(filterTags []string, searchString string, only
func getHighlights(fragmentsMap search.FieldFragmentMap, triggerFields ...mapping.FieldData) []moira.SearchHighlight {
highlights := make([]moira.SearchHighlight, 0)
for _, triggerField := range triggerFields {
var highlightValue string
var highlightValue strings.Builder
if fragments, ok := fragmentsMap[triggerField.GetName()]; ok {
for _, fragment := range fragments {
highlightValue += fragment
highlightValue.WriteString(fragment)
}
highlights = append(highlights, moira.SearchHighlight{
Field: triggerField.GetTagValue(),
Value: highlightValue,
Value: highlightValue.String(),
})
}
}
Expand Down
5 changes: 5 additions & 0 deletions index/bleve/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ func TestTriggerIndex_Search(t *testing.T) {
So(err, ShouldBeNil)
})
})

Convey("Test close index", t, func() {
err := newIndex.Close()
So(err, ShouldBeNil)
})
}

func TestStringsManipulations(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions index/bleve/trigger_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ func (index *TriggerIndex) GetCount() (int64, error) {
}
return int64(documents), nil
}

func (index *TriggerIndex) Close() error {
return index.index.Close()
}
9 changes: 8 additions & 1 deletion index/bleve/trigger_index_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ func runBenchmark(b *testing.B, triggersSize int, batchSize int) {
b.ReportAllocs()

for n := 0; n < b.N; n++ {
newIndex, _ := CreateTriggerIndex(triggerMapping)
newIndex, err := CreateTriggerIndex(triggerMapping)
if err != nil {
logger.Warning().
Msg("Cannot create new trigger index for benchmark test")
continue
}
defer newIndex.Close()

logger.Info().
Int("batch_size", batchSize).
Int("triggers_count", triggersSize).
Expand Down
5 changes: 5 additions & 0 deletions index/bleve/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,9 @@ func TestTriggerIndex_Write(t *testing.T) {
So(err, ShouldBeNil)
})
})

Convey("Test close index", t, func() {
err = newIndex.Close()
So(err, ShouldBeNil)
})
}
3 changes: 3 additions & 0 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type TriggerIndex interface {
Write(checks []*moira.TriggerCheck) error
Delete(triggerIDs []string) error
GetCount() (int64, error)
Close() error
}

// Index represents Index for Bleve.Index type
Expand Down Expand Up @@ -64,6 +65,7 @@ func (index *Index) Start() error {
index.tomb.Go(index.runTriggersToReindexSweepper)
index.tomb.Go(index.checkIndexActualizationLag)
index.tomb.Go(index.checkIndexedTriggersCount)
index.tomb.Go(index.runIndexRefiller)

return nil
}
Expand All @@ -77,5 +79,6 @@ func (index *Index) IsReady() bool {
func (index *Index) Stop() error {
index.logger.Info().Msg("Stop search index")
index.tomb.Kill(nil)
index.triggerIndex.Close()
return index.tomb.Wait()
}
14 changes: 14 additions & 0 deletions index/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ import (

func (index *Index) checkIndexedTriggersCount() error {
checkTicker := time.NewTicker(time.Millisecond * 100) //nolint
defer checkTicker.Stop()

for {
select {
case <-index.tomb.Dying():
return nil
case <-checkTicker.C:
if !index.checkIfIndexIsReady() {
index.logger.Warning().
Msg("Cannot check indexed triggers count cause index is not ready")
continue
}
if documents, err := index.triggerIndex.GetCount(); err == nil {
index.metrics.IndexedTriggersCount.Update(documents)
}
Expand All @@ -20,11 +27,18 @@ func (index *Index) checkIndexedTriggersCount() error {

func (index *Index) checkIndexActualizationLag() error {
checkTicker := time.NewTicker(time.Millisecond * 100) //nolint
defer checkTicker.Stop()

for {
select {
case <-index.tomb.Dying():
return nil
case <-checkTicker.C:
if !index.checkIfIndexIsReady() {
index.logger.Warning().
Msg("Cannot check index actualization lag cause index is not ready")
continue
}
index.metrics.IndexActualizationLag.UpdateSince(time.Unix(index.indexActualizedTS, 0))
}
}
Expand Down
73 changes: 73 additions & 0 deletions index/refiller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package index

import (
"fmt"
"time"
)

const refillerRunInterval = 30 * time.Minute

// const batchSizeForTest = 50 // TODO: DELETE BEFORE MERGE

func (index *Index) runIndexRefiller() error {
ticker := time.NewTicker(refillerRunInterval)
defer ticker.Stop()
index.logger.Info().
Interface("refilling_interval", refillerRunInterval).
Msg("Start refiller for search index")

for {
select {
case <-index.tomb.Dying():
index.logger.Info().Msg("Stop refilling search index")
return nil
case <-ticker.C:
index.logger.Info().Msg("Refill search index by timeout")

start := time.Now()
if err := index.Refill(); err != nil {
index.logger.Warning().
Error(err).
Msg("Cannot refill index")
continue
}
end := time.Now()
index.logger.Debug().
Msg(fmt.Sprintf("Refill took %v sec", end.Sub(start).Seconds()))
}
}
}

// Completely clears the index and then repopulates it, this function is needed to clean up memory leaks that appear when updating or searching the index
func (index *Index) Refill() error {
start := time.Now()
triggerIds, err := index.database.GetAllTriggerIDs()
if err != nil {
return err
}
end := time.Now()
index.logger.Debug().
Msg(fmt.Sprintf("Fetching all trigger ids from database took %v sec", end.Sub(start).Seconds()))

index.indexed = false
defer func() {
index.indexed = true
}()
start = time.Now()
if err := index.deleteByBatches(triggerIds, defaultIndexBatchSize); err != nil {
return err
}
end = time.Now()
index.logger.Debug().
Msg(fmt.Sprintf("Deleting all triggers from index took %v sec", end.Sub(start).Seconds()))

start = time.Now()
if err := index.fillIndex(); err != nil {
return err
}
end = time.Now()
index.logger.Debug().
Msg(fmt.Sprintf("Filling all triggers to index took %v sec", end.Sub(start).Seconds()))

return nil
}
Loading
Loading