Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LRU Cache for DotPathToSlice #1890

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion internal/bloblang/parser/mapping_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@ import (
"strings"

"github.com/Jeffail/gabs/v2"
lru "github.com/hashicorp/golang-lru/v2"

"github.com/benthosdev/benthos/v4/internal/bloblang/mapping"
"github.com/benthosdev/benthos/v4/internal/bloblang/query"
)

var (
dotPathCache *lru.Cache[string, []string]
)

func init() {
var err error
if dotPathCache, err = lru.New[string, []string](1024); err != nil {
panic(err)
}
}

// ParseMapping parses a bloblang mapping and returns an executor to run it, or
// an error if the parsing fails.
//
Expand Down Expand Up @@ -461,7 +473,21 @@ func pathParser() Func {
if sequence[1] != nil {
pathParts := sequence[1].([]any)[1].(DelimitedResult).Primary
for _, p := range pathParts {
path = append(path, gabs.DotPathToSlice(p.(string))...)
var (
dotPathSlice []string
ok bool
)

pathString := p.(string)
dotPathSlice, ok = dotPathCache.Get(pathString)

if !ok {
dotPathSlice = gabs.DotPathToSlice(pathString)
dotPathCache.Add(pathString, dotPathSlice)
}

path = append(path, dotPathSlice...)

}
}

Expand Down
51 changes: 45 additions & 6 deletions internal/bloblang/query/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@ import (

"github.com/Jeffail/gabs/v2"
"github.com/gofrs/uuid"
lru "github.com/hashicorp/golang-lru/v2"
gonanoid "github.com/matoous/go-nanoid/v2"
"github.com/segmentio/ksuid"

"github.com/benthosdev/benthos/v4/internal/tracing"
)

var dotPathCache *lru.Cache[string, []string]

func init() {
var err error
if dotPathCache, err = lru.New[string, []string](1024); err != nil {
panic(err)
}
}

type fieldFunction struct {
namedContext string
fromRoot bool
Expand Down Expand Up @@ -104,8 +114,14 @@ func (f *fieldFunction) Close(ctx context.Context) error {
// return a field from a named context.
func NewNamedContextFieldFunction(namedContext, pathStr string) Function {
var path []string
var ok bool
if len(pathStr) > 0 {
path = gabs.DotPathToSlice(pathStr)
path, ok = dotPathCache.Get(pathStr)

if !ok {
path = gabs.DotPathToSlice(pathStr)
dotPathCache.Add(pathStr, path)
}
}
return &fieldFunction{namedContext: namedContext, fromRoot: false, path: path}
}
Expand All @@ -114,8 +130,14 @@ func NewNamedContextFieldFunction(namedContext, pathStr string) Function {
// current context.
func NewFieldFunction(pathStr string) Function {
var path []string
var ok bool
if len(pathStr) > 0 {
path = gabs.DotPathToSlice(pathStr)
path, ok = dotPathCache.Get(pathStr)

if !ok {
path = gabs.DotPathToSlice(pathStr)
dotPathCache.Add(pathStr, path)
}
}
return &fieldFunction{
path: path,
Expand All @@ -125,9 +147,17 @@ func NewFieldFunction(pathStr string) Function {
// NewRootFieldFunction creates a query function that returns a field from the
// root context.
func NewRootFieldFunction(pathStr string) Function {
var path []string
var (
path []string
ok bool
)
if len(pathStr) > 0 {
path = gabs.DotPathToSlice(pathStr)
path, ok = dotPathCache.Get(pathStr)

if !ok {
path = gabs.DotPathToSlice(pathStr)
dotPathCache.Add(pathStr, path)
}
}
return &fieldFunction{
fromRoot: true,
Expand Down Expand Up @@ -440,9 +470,18 @@ func jsonFunction(args *ParsedParams) (Function, error) {
if err != nil {
return nil, err
}
var argPath []string
var (
argPath []string
ok bool
)

if len(path) > 0 {
argPath = gabs.DotPathToSlice(path)
argPath, ok = dotPathCache.Get(path)

if !ok {
argPath = gabs.DotPathToSlice(path)
dotPathCache.Add(path, argPath)
}
}
return ClosureFunction("json path `"+SliceToDotPath(argPath...)+"`", func(ctx FunctionContext) (any, error) {
jPart, err := ctx.MsgBatch.Get(ctx.Index).AsStructured()
Expand Down
13 changes: 12 additions & 1 deletion internal/bloblang/query/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,18 @@ func (g *getMethod) QueryTargets(ctx TargetsContext) (TargetsContext, []TargetPa

// NewGetMethod creates a new get method.
func NewGetMethod(target Function, pathStr string) (Function, error) {
path := gabs.DotPathToSlice(pathStr)
var (
path []string
ok bool
)

path, ok = dotPathCache.Get(pathStr)

if !ok {
path = gabs.DotPathToSlice(pathStr)
dotPathCache.Add(pathStr, path)
}

switch t := target.(type) {
case *getMethod:
newPath := append([]string{}, t.path...)
Expand Down
41 changes: 38 additions & 3 deletions internal/bloblang/query/methods_structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,19 @@ var _ = registerSimpleMethod(
if err != nil {
return nil, err
}
path := gabs.DotPathToSlice(pathStr)

var (
path []string
ok bool
)

path, ok = dotPathCache.Get(pathStr)

if !ok {
path = gabs.DotPathToSlice(pathStr)
dotPathCache.Add(pathStr, path)
}

return func(v any, ctx FunctionContext) (any, error) {
return gabs.Wrap(v).Exists(path...), nil
}, nil
Expand Down Expand Up @@ -325,7 +337,18 @@ Exploding objects results in an object where the keys match the target object, a
if err != nil {
return nil, err
}
path := gabs.DotPathToSlice(pathRaw)
var (
path []string
ok bool
)

path, ok = dotPathCache.Get(pathRaw)

if !ok {
path = gabs.DotPathToSlice(pathRaw)
dotPathCache.Add(pathRaw, path)
}

return func(v any, ctx FunctionContext) (any, error) {
rootMap, ok := v.(map[string]any)
if !ok {
Expand Down Expand Up @@ -1690,7 +1713,19 @@ If a key within a nested path does not exist or is not an object then it is not
if err != nil {
return nil, fmt.Errorf("argument %v: %w", i, err)
}
excludeList = append(excludeList, gabs.DotPathToSlice(argStr))
var (
path []string
ok bool
)

path, ok = dotPathCache.Get(argStr)

if !ok {
path = gabs.DotPathToSlice(argStr)
dotPathCache.Add(argStr, path)
}

excludeList = append(excludeList, path)
}
return func(v any, ctx FunctionContext) (any, error) {
m, ok := v.(map[string]any)
Expand Down