Skip to content

Commit

Permalink
⚡ ♻️ redesign field indexes for better performance
Browse files Browse the repository at this point in the history
 - only work with keys within the queried time-window
 - reduce memory footprint of the filtering function
  • Loading branch information
linkdd committed Aug 23, 2024
1 parent d26cea5 commit e4be1de
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 167 deletions.
44 changes: 33 additions & 11 deletions docs/design/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,18 @@ stream:test

### Field Index

Each field of a log entry is indexed by adding the log entry's key in a list,
stored at the following key:
Each field of a log entry is indexed by creating a new key referencing the
field and the associated log entry's key:

```
index:<stream name>:field:<field name>:<field value>
index:<stream name>:field:<field name>:<base64 encoded field value>:<entry key>
```

Example:

```
index:test:field:foo:bar = [
entry:test:00000001724140167373:057804d1-832f-45bf-8e70-7acbf22ec480
]
# base64(bar) = YmFy
index:test:field:foo:YmFy:entry:test:00000001724140167373:057804d1-832f-45bf-8e70-7acbf22ec480
```

## Querying
Expand Down Expand Up @@ -83,12 +82,35 @@ stream:<stream name>:<to timestamp>:

Because of the internal structure of *BadgerDB*, this operation is fast.

Then, if a [filter](../guides/filtering.md) is given, we fetch the indexes that
match the filter by iterating over keys with the following prefixes:
Then, if a [filter](../guides/filtering.md) is given:

For `foo = "bar"` filters, we fetch all keys within the time-window with the
following prefix:

```
index:<stream name>:field:<field name>:
index:<stream name>:field:foo:YmFy:
```

Once we got the complete list of log entry keys to fetch, we sort them and fetch
the actual records.
For `foo in ["bar", "baz"]` filters, we fetch all keys the time-window with
the following prefixes:

```
# base64(bar) = YmFy
# base64(baz) = YmF6
index:<stream name>:field:foo:YmFy:
index:<stream name>:field:foo:YmF6:
```

For `not <sub-filter>` filters, we select all keys in the time-window that do
not appear in the `<sub-filter>` result.

> **NB:**
> - `field != value` is desugared into `not (field = value)`
> - `field not in [value, value]` is desugared into `not (field in [value, value])`
For `<lhs> or <rhs>` filters, we select the union of `<lhs>` and `<rhs>`
results.

For `<lhs> and <rhs>` filters, we select the intersection of `<lhs>` and `<rhs>`
results.
68 changes: 22 additions & 46 deletions internal/storage/field_index.go
Original file line number Diff line number Diff line change
@@ -1,68 +1,44 @@
package storage

import (
"encoding/json"
"encoding/base64"
"fmt"

"github.com/dgraph-io/badger/v3"
)

type fieldIndex struct {
txn *badger.Txn
key []byte
txn *badger.Txn
keyPrefix []byte
}

func newFieldIndex(txn *badger.Txn, stream, field, value string) *fieldIndex {
encodedValue := base64.StdEncoding.EncodeToString([]byte(value))

return &fieldIndex{
txn: txn,
key: []byte(fmt.Sprintf("index:%s:field:%s:%s", stream, field, value)),
keyPrefix: []byte(fmt.Sprintf(
"index:%s:field:%s:%s:",
stream, field, encodedValue,
)),
}
}

func (index *fieldIndex) AddKey(entryKey []byte) error {
fieldKeys, err := index.load()
if err != nil {
return err
}

fieldKeys = append(fieldKeys, string(entryKey))

return index.store(fieldKeys)
}

func (index *fieldIndex) GetKeys() ([]string, error) {
return index.load()
}

func (index *fieldIndex) load() ([]string, error) {
fieldKeys := []string{}

fieldIndexValue, err := index.txn.Get(index.key)
if err == badger.ErrKeyNotFound {
return fieldKeys, nil
} else if err != nil {
return nil, err
}

err = fieldIndexValue.Value(func(val []byte) error {
return json.Unmarshal(val, &fieldKeys)
})
if err != nil {
return nil, &UnmarshalError{Reason: err}
}

return fieldKeys, nil
indexKey := []byte(fmt.Sprintf("%s%s", index.keyPrefix, entryKey))
return index.txn.Set(indexKey, []byte{})
}

func (index *fieldIndex) store(fieldKeys []string) error {
fieldKeysEncoded, err := json.Marshal(fieldKeys)
if err != nil {
return &MarshalError{Reason: err}
}

if err := index.txn.Set(index.key, fieldKeysEncoded); err != nil {
return err
func (index *fieldIndex) IterKeys(fn func(key string)) {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
opts.Prefix = index.keyPrefix
it := index.txn.NewIterator(opts)
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
indexKey := it.Item().Key()
entryKey := string(indexKey[len(index.keyPrefix):])
fn(entryKey)
}

return nil
}
109 changes: 30 additions & 79 deletions internal/storage/fields_index.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package storage

import (
"fmt"

"github.com/dgraph-io/badger/v3"
)

Expand All @@ -18,116 +16,69 @@ func newFieldsIndex(txn *badger.Txn, stream string) *fieldsIndex {
}
}

func (index *fieldsIndex) Filter(filter Filter) ([]string, error) {
var evaluate func(f Filter) (map[string]struct{}, error)
evaluate = func(f Filter) (map[string]struct{}, error) {
func (index *fieldsIndex) Filter(filter Filter, allKeys []string) []string {
allKeysMap := sliceToMap(allKeys)

var evaluate func(f Filter) map[string]struct{}
evaluate = func(f Filter) map[string]struct{} {
switch f := f.(type) {
case *AndFilter:
keys, err := evaluate(f.Filters[0])
if err != nil {
return nil, err
}
keys := evaluate(f.Filters[0])

for _, subFilter := range f.Filters[1:] {
subKeys, err := evaluate(subFilter)
if err != nil {
return nil, err
}

subKeys := evaluate(subFilter)
keys = intersectKeysMap(keys, subKeys)
}

return keys, nil
return keys

case *OrFilter:
keys := map[string]struct{}{}

for _, subFilter := range f.Filters {
subKeys, err := evaluate(subFilter)
if err != nil {
return nil, err
}

subKeys := evaluate(subFilter)
keys = unionKeysMap(keys, subKeys)
}

return keys, nil
return keys

case *NotFilter:
keys, err := evaluate(f.Filter)
if err != nil {
return nil, err
}

allKeys, err := index.getAllKeys()
if err != nil {
return nil, err
}

return differenceKeysMap(allKeys, keys), nil
keys := evaluate(f.Filter)
return differenceKeysMap(allKeysMap, keys)

case *FieldExact:
fieldIndex := newFieldIndex(index.txn, index.stream, f.Field, f.Value)
keys, err := fieldIndex.GetKeys()
if err != nil {
return nil, err
}

return sliceToMap(keys), nil
keys := map[string]struct{}{}

fieldIndex.IterKeys(func(key string) {
if _, found := allKeysMap[key]; found {
keys[key] = struct{}{}
}
})

return keys

case *FieldIn:
keys := map[string]struct{}{}

for _, value := range f.Values {
fieldIndex := newFieldIndex(index.txn, index.stream, f.Field, value)
valueKeys, err := fieldIndex.GetKeys()
if err != nil {
return nil, err
}

for _, key := range valueKeys {
keys[key] = struct{}{}
}
fieldIndex.IterKeys(func(key string) {
if _, found := allKeysMap[key]; found {
keys[key] = struct{}{}
}
})
}

return keys, nil
return keys

default:
return map[string]struct{}{}, nil
return map[string]struct{}{}
}
}

keys, err := evaluate(filter)
if err != nil {

return nil, err
}

return mapToSlice(keys), err
}

func (index *fieldsIndex) GetKeys() ([]string, error) {
keys, err := index.getAllKeys()
if err != nil {
return nil, err
}

return mapToSlice(keys), nil
}

func (index *fieldsIndex) getAllKeys() (map[string]struct{}, error) {
keys := map[string]struct{}{}

opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
opts.Prefix = []byte(fmt.Sprintf("entry:%s:", index.stream))
it := index.txn.NewIterator(opts)
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
key := string(it.Item().KeyCopy(nil))
keys[key] = struct{}{}
}

return keys, nil
keys := evaluate(filter)
return mapToSlice(keys)
}
7 changes: 0 additions & 7 deletions internal/storage/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ func intersectKeysMap(a, b map[string]struct{}) map[string]struct{} {
return result
}

func intersectKeysList(a, b []string) []string {
ma := sliceToMap(a)
mb := sliceToMap(b)
mres := intersectKeysMap(ma, mb)
return mapToSlice(mres)
}

func unionKeysMap(a, b map[string]struct{}) map[string]struct{} {
result := map[string]struct{}{}

Expand Down
25 changes: 1 addition & 24 deletions internal/storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,6 @@ func (s *Storage) Query(

if key < toPrefix {
timeKeys = append(timeKeys, key)

slog.DebugContext(
ctx,
"Log entry key added to queryset based on time index",
"channel", "storage",
"stream", stream,
"key", key,
)
} else {
break
}
Expand All @@ -145,22 +137,7 @@ func (s *Storage) Query(
var filteredKeys []string
if filter != nil {
fieldsIndex := newFieldsIndex(txn, stream)
fieldKeys, err := fieldsIndex.Filter(filter)
if err != nil {
return &QueryError{Operation: "fields-index", Reason: err}
}

for _, key := range fieldKeys {
slog.DebugContext(
ctx,
"Log entry key added to queryset based on field index",
"channel", "storage",
"stream", stream,
"key", key,
)
}

filteredKeys = intersectKeysList(timeKeys, fieldKeys)
filteredKeys = fieldsIndex.Filter(filter, timeKeys)
} else {
filteredKeys = timeKeys
}
Expand Down

0 comments on commit e4be1de

Please sign in to comment.