Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: iavl/v2 alpha6 [DNM] #1043

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion v2/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,62 @@
# IAVL v2
# iavl/v2

IAVL v2 is performance minded rewrite of IAVL v1. Benchmarks show a 10-20x improvement in
throughput depending on the operation. The primary changes are:

- Checkpoints: periodic writes of dirty branch nodes to disk.
- Leaf changelog: leaf nodes are flushed to disk at every version.
- Replay: revert the tree to a previous version by replaying the leaf changelog.
- Sharding: shards are created on pruning events.
- BTree on disk: SQLite (a mature BTree implementation) is used for storage.
- Cache: the AVL tree is cached in memory and (non-dirty) nodes evicted by configurable policy.

## Concepts

### Checkpoints

A checkpoint writes all dirty branch nodes currently in memory since the last checkpoint to
disk. Checkpoints are distinct from shards. One shard may contain multiple checkpoints. A checkpoint occurs
at a configurable interval or when the dirty branch nodes exceed a threshold.

### Leaf Changelog

The leaf changelog is a list of leaf nodes that have been written since the last checkpoint. Inserts and
updates are in one table, deletes in another. They are ordered by a sequence number per version to allow for
deterministic replay. The also makes it possible to evict leafs from the tree and rely on SQLite's
page cache and memory map to manage efficient access for leaves.

### Replay

Replay is the process of reverting the tree to a previous version. Given a version v, the tree is loaded at
the check version m less than or equal to v. The leaf changelog is replayed from m to v. The tree is now at
version v.

This is useful for rolling back, or querying and proving the state of the tree at a previous version.

### Sharding

A shard contains all the changes to a tree from version m to version n. It may contain multiple checkpoints.

### BTree (SQLite)

Why SQLite? A B+Tree is a very efficient on disk data structure. The ideal implementation of IAVL on disk
would be to lay out nodes in subtrees chunks in the same format as the in-memory AVL tree. A B+Tree is a
as close an approximation to this as possible.

## Pruning

Parameters:

- invalidated ratio: the ratio of invalidated nodes to total nodes in a shard that triggers a
pruning event. The default is 1.5. Roughly correleates to disk size of a complete tree, where (2 * ratio) is the size of the pre preuned, tree on disk. A ratio of 1.5 means that 3x the initial size should be provisioned.
- minumum keep versions: the minimum number of versions to keep. This is a safety feature to
prevent pruning to a version that is too recent. The default is 100.

Pruning events only occur on checkpoint boundaries. The prune version is the most recent check
point less than or equal to the requested prune version.

On prune the latest shard is locked (readonly) and a new shard is created. The new shard is now
the hot shard and subsequent SaveVersion calls write leafs and branches to it.

Deletes happen by writing a new shard without orphans, updating the shard connection, then
dropping the old one.
155 changes: 155 additions & 0 deletions v2/cmd/bench/bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package bench

import (
"net/http"
"os"
"runtime/pprof"
"testing"
"time"

"github.com/cosmos/iavl/v2"
"github.com/cosmos/iavl/v2/metrics"
"github.com/cosmos/iavl/v2/testutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
)

func Command() *cobra.Command {
cmd := &cobra.Command{
Use: "bench",
Short: "run benchmarks",
}
cmd.AddCommand(benchCommand())
return cmd
}

func benchCommand() *cobra.Command {
var (
dbPath string
changelogPath string
loadSnapshot bool
usePrometheus bool
cpuProfile string
)
cmd := &cobra.Command{
Use: "std",
Short: "run the std development benchmark",
Long: `Runs a longer benchmark for the IAVL tree. This is useful for development and testing.
Pre-requisites this command:
$ go run ./cmd gen tree --db /tmp/iavl-v2 --limit 1 --type osmo-like-many
mkdir -p /tmp/osmo-like-many/v2 && go run ./cmd gen emit --start 2 --limit 1000 --type osmo-like-many --out /tmp/osmo-like-many/v2

Optional for --snapshot arg:
$ go run ./cmd snapshot --db /tmp/iavl-v2 --version 1
`,

RunE: func(_ *cobra.Command, _ []string) error {
if cpuProfile != "" {
f, err := os.Create(cpuProfile)
if err != nil {
return err
}
if err := pprof.StartCPUProfile(f); err != nil {
return err
}
defer func() {
pprof.StopCPUProfile()
f.Close()
}()
}
t := &testing.T{}
treeOpts := iavl.DefaultTreeOptions()
treeOpts.CheckpointInterval = 80
treeOpts.StateStorage = true
treeOpts.HeightFilter = 1
treeOpts.EvictionDepth = 22
treeOpts.PruneRatio = 0
treeOpts.MetricsProxy = metrics.NewStructMetrics()
if usePrometheus {
treeOpts.MetricsProxy = newPrometheusMetricsProxy()
}

var multiTree *iavl.MultiTree
if loadSnapshot {
pool := iavl.NewNodePool()
var err error
multiTree, err = iavl.ImportMultiTree(pool, 1, dbPath, treeOpts)
require.NoError(t, err)
} else {
multiTree = iavl.NewMultiTree(dbPath, treeOpts)
require.NoError(t, multiTree.MountTrees())
require.NoError(t, multiTree.LoadVersion(1))
require.NoError(t, multiTree.WarmLeaves())
}

opts := testutil.CompactedChangelogs(changelogPath)
opts.SampleRate = 250_000

// opts.Until = 1_000
// opts.UntilHash = "557663181d9ab97882ecfc6538e3b4cfe31cd805222fae905c4b4f4403ca5cda"
opts.Until = 500
opts.UntilHash = "2670bd5767e70f2bf9e4f723b5f205759e39afdb5d8cfb6b54a4a3ecc27a1377"

multiTree.TestBuild(t, opts)
return nil
},
}
cmd.Flags().StringVar(&dbPath, "db", "/tmp/iavl-v2", "the path to the database at version 1")
cmd.Flags().StringVar(&changelogPath, "changelog", "/tmp/osmo-like-many/v2", "the path to the changelog")
cmd.Flags().BoolVar(&loadSnapshot, "snapshot", false, "load the snapshot at version 1 before running the benchmarks (loads full tree into memory)")
cmd.Flags().BoolVar(&usePrometheus, "prometheus", false, "enable prometheus metrics")
cmd.Flags().StringVar(&cpuProfile, "cpu-profile", "", "write cpu profile to file")

if err := cmd.MarkFlagRequired("changelog"); err != nil {
panic(err)
}
if err := cmd.MarkFlagRequired("db"); err != nil {
panic(err)
}
return cmd
}

var _ metrics.Proxy = &prometheusMetricsProxy{}

type prometheusMetricsProxy struct {
workingSize prometheus.Gauge
workingBytes prometheus.Gauge
}

func newPrometheusMetricsProxy() *prometheusMetricsProxy {
p := &prometheusMetricsProxy{}
p.workingSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "iavl_working_size",
Help: "working size",
})
p.workingBytes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "iavl_working_bytes",
Help: "working bytes",
})
http.Handle("/metrics", promhttp.Handler())
go func() {
err := http.ListenAndServe(":2112", nil)
if err != nil {
panic(err)
}
}()
Comment on lines +133 to +138
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add proper HTTP server shutdown handling.

The HTTP server is started in a goroutine without proper shutdown handling. This could lead to resource leaks.

Consider implementing graceful shutdown:

+var srv *http.Server
+
 func newPrometheusMetricsProxy() *prometheusMetricsProxy {
     // ... existing code ...
+    srv = &http.Server{
+        Addr:    ":2112",
+        Handler: nil,
+    }
     go func() {
-        err := http.ListenAndServe(":2112", nil)
+        err := srv.ListenAndServe()
         if err != nil && err != http.ErrServerClosed {
             panic(err)
         }
     }()
     return p
 }
+
+func (p *prometheusMetricsProxy) Close() error {
+    if srv != nil {
+        return srv.Close()
+    }
+    return nil
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
go func() {
err := http.ListenAndServe(":2112", nil)
if err != nil {
panic(err)
}
}()
var srv *http.Server
func newPrometheusMetricsProxy() *prometheusMetricsProxy {
// ... existing code ...
srv = &http.Server{
Addr: ":2112",
Handler: nil,
}
go func() {
err := srv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
panic(err)
}
}()
return p
}
func (p *prometheusMetricsProxy) Close() error {
if srv != nil {
return srv.Close()
}
return nil
}

return p
}

func (p *prometheusMetricsProxy) IncrCounter(_ float32, _ ...string) {
}

func (p *prometheusMetricsProxy) SetGauge(val float32, keys ...string) {
k := keys[1]
switch k {
case "working_size":
p.workingSize.Set(float64(val))
case "working_bytes":
p.workingBytes.Set(float64(val))
}
}

func (p *prometheusMetricsProxy) MeasureSince(_ time.Time, _ ...string) {}
32 changes: 19 additions & 13 deletions v2/cmd/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import (
"github.com/dustin/go-humanize"
"github.com/kocubinski/costor-api/compact"
"github.com/kocubinski/costor-api/core"
"github.com/rs/zerolog"
zlog "github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)

var log = iavl.NewTestLogger()
var log = zlog.Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.Stamp,
})

func Command() *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -51,7 +56,7 @@ func emitCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "emit",
Short: "emit generated changesets to disk",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, _ []string) error {
itr, err := getChangesetIterator(typ)
if err != nil {
return err
Expand All @@ -70,10 +75,9 @@ func emitCommand() *cobra.Command {
go func() {
stats, err := stream.Compact()
if err != nil {
log.Error("failed to compact", "error", err)
os.Exit(1)
log.Fatal().Err(err).Msg("failed to compact")
}
log.Info(stats.Report())
log.Info().Msg(stats.Report())
wg.Done()
}()

Expand All @@ -91,13 +95,13 @@ func emitCommand() *cobra.Command {

if itr.Version() < int64(start) {
if cnt%5_000_000 == 0 {
log.Info(fmt.Sprintf("fast forward version=%d nodes=%s", itr.Version(), humanize.Comma(cnt)))
log.Info().Msgf("fast forward version=%d nodes=%s", itr.Version(), humanize.Comma(cnt))
}
continue
}

if cnt%500_000 == 0 {
log.Info(fmt.Sprintf("version=%d nodes=%s", itr.Version(), humanize.Comma(cnt)))
log.Info().Msgf("version=%d nodes=%s", itr.Version(), humanize.Comma(cnt))
}

select {
Expand Down Expand Up @@ -144,12 +148,12 @@ func treeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tree",
Short: "build and save a Tree to disk, taking generated changesets as input",
RunE: func(cmd *cobra.Command, args []string) error {
multiTree := iavl.NewMultiTree(iavl.NewTestLogger(), dbPath, iavl.TreeOptions{StateStorage: true})
RunE: func(_ *cobra.Command, _ []string) error {
multiTree := iavl.NewMultiTree(dbPath, iavl.TreeOptions{StateStorage: true})
defer func(mt *iavl.MultiTree) {
err := mt.Close()
if err != nil {
log.Error("failed to close db", "error", err)
log.Error().Err(err).Msg("failed to close db")
}
}(multiTree)

Expand Down Expand Up @@ -199,12 +203,12 @@ func treeCommand() *cobra.Command {

i++
if i%100_000 == 0 {
log.Info(fmt.Sprintf("leaves=%s dur=%s rate=%s version=%d",
log.Info().Msgf("leaves=%s dur=%s rate=%s version=%d",
humanize.Comma(i),
time.Since(start),
humanize.Comma(int64(100_000/time.Since(start).Seconds())),
itr.Version(),
))
)
start = time.Now()
}
}
Expand All @@ -215,7 +219,7 @@ func treeCommand() *cobra.Command {
}
}

log.Info(fmt.Sprintf("last version=%d hash=%x", lastVersion, lastHash))
log.Info().Msgf("last version=%d hash=%x", lastVersion, lastHash)

return nil
},
Expand All @@ -228,3 +232,5 @@ func treeCommand() *cobra.Command {
cmd.Flags().Int64Var(&limit, "limit", -1, "the version (inclusive) to halt generation at. -1 means no limit")
return cmd
}

// pre-requisites this command
14 changes: 10 additions & 4 deletions v2/cmd/rollback/rollback.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package rollback

import (
"fmt"
"os"
"time"

"github.com/cosmos/iavl/v2"
"github.com/rs/zerolog"
zlog "github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)

var log = iavl.NewTestLogger()
var log = zlog.Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.Stamp,
})

func Command() *cobra.Command {
var (
Expand All @@ -17,13 +23,13 @@ func Command() *cobra.Command {
cmd := &cobra.Command{
Use: "rollback",
Short: "Rollback IAVL to a previous version",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
dbPaths, err := iavl.FindDbsInPath(path)
if err != nil {
return err
}
for _, dbPath := range dbPaths {
log.Info(fmt.Sprintf("revert db %s to version %d", dbPath, version))
log.Info().Msgf("revert db %s to version %d", dbPath, version)
sql, err := iavl.NewSqliteDb(iavl.NewNodePool(), iavl.SqliteDbOptions{Path: dbPath})
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions v2/cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/cosmos/iavl/v2/cmd/bench"
"github.com/cosmos/iavl/v2/cmd/gen"
"github.com/cosmos/iavl/v2/cmd/rollback"
"github.com/cosmos/iavl/v2/cmd/scan"
Expand All @@ -18,6 +19,7 @@ func RootCommand() (*cobra.Command, error) {
snapshot.Command(),
rollback.Command(),
scan.Command(),
bench.Command(),
latestCommand(),
)
return cmd, nil
Expand Down
Loading
Loading