Skip to content

Commit

Permalink
refactor: improve snapshot generation performance
Browse files Browse the repository at this point in the history
Makes all snapshot tables long-lived with proper unique constraints.

Previously, snapshot tables were dropped (if they existed) and
re-generated for a given snapshot date. This results in a lot of
data being generated and thrown out for no reason. It also meant
that we couldnt put proper indexes on these tables since they would
disappear when the table was dropped and re-created.
  • Loading branch information
seanmcgary committed Feb 25, 2025
1 parent ffacc18 commit 4005a00
Show file tree
Hide file tree
Showing 19 changed files with 347 additions and 224 deletions.
2 changes: 1 addition & 1 deletion .testdataVersion
Original file line number Diff line number Diff line change
@@ -1 +1 @@
da8f00be49d1447f22934629d9d6819e3d763af9
d67ef5d895bdc0ccd6a006a1683ac3e58f820ad0
2 changes: 1 addition & 1 deletion pkg/postgres/migrations/202410241239_combinedRewards/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error {
duration integer,
block_number bigint not null,
block_time timestamp without time zone not null,
block_date date not null,
block_date text not null,
reward_type varchar not null
)`,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package _202502180836_snapshotUniqueConstraints

import (
"database/sql"
"github.com/Layr-Labs/sidecar/internal/config"
"github.com/pkg/errors"
"gorm.io/gorm"
)

type Migration struct {
}

func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error {
queries := []string{
`alter table staker_shares add constraint uniq_staker_shares unique (staker, strategy, transaction_hash, log_index, block_number)`,
`create table if not exists staker_share_snapshots (
staker varchar,
strategy varchar,
shares numeric,
snapshot date
)`,
`alter table staker_share_snapshots add constraint uniq_staker_share_snapshots unique (staker, strategy, snapshot)`,

// re-add indexes that likely got nuked due to dropping and re-creating the snapshot table
`create index if not exists idx_staker_share_snapshots_staker_strategy_snapshot on staker_share_snapshots (staker, strategy, snapshot)`,
`create index if not exists idx_staker_share_snapshots_strategy_snapshot on staker_share_snapshots (strategy, snapshot)`,

`alter table staker_delegation_snapshots add constraint uniq_staker_delegation_snapshots unique (staker, operator, snapshot)`,
`create index if not exists idx_staker_delegation_snapshots_operator_snapshot on staker_delegation_snapshots (operator, snapshot)`,

`alter table operator_share_snapshots add constraint uniq_operator_share_snapshots unique (operator, strategy, snapshot)`,

`alter table operator_shares add constraint uniq_operator_shares unique (operator, strategy, transaction_hash, log_index, block_number)`,

`alter table operator_pi_split_snapshots add constraint uniq_operator_pi_split_snapshots unique (operator, split, snapshot)`,

`create table if not exists operator_directed_rewards(
avs varchar,
reward_hash varchar,
token varchar,
operator varchar,
operator_index integer,
amount numeric,
strategy varchar,
strategy_index integer,
multiplier numeric(78),
start_timestamp timestamp(6),
end_timestamp timestamp(6),
duration bigint,
block_number bigint,
block_time timestamp(6),
block_date text
);`,
`alter table operator_directed_rewards add constraint uniq_operator_directed_rewards unique (avs, reward_hash, strategy_index, operator_index)`,

`alter table operator_avs_strategy_snapshots add constraint uniq_operator_avs_strategy_snapshots unique (operator, avs, strategy, snapshot)`,

`alter table operator_avs_registration_snapshots add constraint uniq_operator_avs_registration_snapshots unique (operator, avs, snapshot)`,

`alter table default_operator_split_snapshots add constraint uniq_default_operator_split_snapshots unique (snapshot)`,

`alter table combined_rewards add constraint uniq_combined_rewards unique (avs, reward_hash, strategy_index)`,

`alter table operator_avs_split_snapshots add constraint uniq_operator_avs_split_snapshots unique (operator, avs, snapshot)`,
}

for _, query := range queries {
res := grm.Exec(query)
if res.Error != nil {
return errors.Wrapf(res.Error, "failed to execute query: %s", query)
}
}
return nil
}

func (m *Migration) GetName() string {
return "202502180836_snapshotUniqueConstraints"
}
2 changes: 2 additions & 0 deletions pkg/postgres/migrations/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
_202501241111_addIndexesForRpcFunctions "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202501241111_addIndexesForRpcFunctions"
_202502100846_goldTableRewardHashIndex "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202502100846_goldTableRewardHashIndex"
_202502180836_snapshotUniqueConstraints "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202502180836_snapshotUniqueConstraints"
_202502211539_hydrateClaimedRewards "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202502211539_hydrateClaimedRewards"
"time"

Expand Down Expand Up @@ -140,6 +141,7 @@ func (m *Migrator) MigrateAll() error {
&_202501241111_addIndexesForRpcFunctions.Migration{},
&_202502100846_goldTableRewardHashIndex.Migration{},
&_202502211539_hydrateClaimedRewards.Migration{},
&_202502180836_snapshotUniqueConstraints.Migration{},
}

for _, migration := range migrations {
Expand Down
94 changes: 51 additions & 43 deletions pkg/rewards/combinedRewards.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,69 @@
package rewards

import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
import (
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
"go.uber.org/zap"
)

const rewardsCombinedQuery = `
with combined_rewards as (
select
rs.avs,
rs.reward_hash,
rs.token,
rs.amount,
rs.strategy,
rs.strategy_index,
rs.multiplier,
rs.start_timestamp,
rs.end_timestamp,
rs.duration,
rs.block_number,
b.block_time::timestamp(6),
to_char(b.block_time, 'YYYY-MM-DD') AS block_date,
rs.reward_type
from reward_submissions as rs
left join blocks as b on (b.number = rs.block_number)
-- pipeline bronze table uses this to filter the correct records
where b.block_time < TIMESTAMP '{{.cutoffDate}}'
)
insert into combined_rewards (avs, reward_hash, token, amount, start_timestamp, duration, end_timestamp, strategy, multiplier, strategy_index, block_number, block_time, block_date, reward_type)
with _combined_rewards as (
select
avs,
reward_hash,
token,
amount,
start_timestamp,
duration,
end_timestamp,
strategy,
multiplier,
strategy_index,
block_number,
block_time,
block_date,
reward_type
from combined_rewards
rs.avs,
rs.reward_hash,
rs.token,
rs.amount,
rs.strategy,
rs.strategy_index,
rs.multiplier,
rs.start_timestamp,
rs.end_timestamp,
rs.duration,
rs.block_number,
b.block_time::timestamp(6),
to_char(b.block_time, 'YYYY-MM-DD') AS block_date,
rs.reward_type
from reward_submissions as rs
left join blocks as b on (b.number = rs.block_number)
-- pipeline bronze table uses this to filter the correct records
where b.block_time < TIMESTAMP '{{.cutoffDate}}'
)
select
avs,
reward_hash,
token,
amount,
start_timestamp,
duration,
end_timestamp,
strategy,
multiplier,
strategy_index,
block_number,
block_time,
block_date,
reward_type
from _combined_rewards
on conflict on constraint uniq_combined_rewards do nothing;
`

func (r *RewardsCalculator) GenerateAndInsertCombinedRewards(snapshotDate string) error {
tableName := "combined_rewards"

query, err := rewardsUtils.RenderQueryTemplate(rewardsCombinedQuery, map[string]interface{}{
"cutoffDate": snapshotDate,
})
if err != nil {
r.logger.Sugar().Errorw("Failed to render rewards combined query", "error", err)
r.logger.Sugar().Errorw("Failed to render rewards combined query",
zap.Error(err),
)
return err
}

err = r.generateAndInsertFromQuery(tableName, query, nil)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate combined rewards", "error", err)
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate combined rewards",
zap.String("snapshotDate", snapshotDate),
zap.Error(res.Error),
)
return err
}
return nil
Expand Down
27 changes: 18 additions & 9 deletions pkg/rewards/defaultOperatorSplitSnapshots.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package rewards

import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
import (
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
"go.uber.org/zap"
)

const defaultOperatorSplitSnapshotQuery = `
insert into default_operator_split_snapshots (split, snapshot)
WITH default_operator_splits_with_block_info as (
select
dos.new_default_operator_split_bips as split,
Expand Down Expand Up @@ -56,24 +60,29 @@ final_results as (
CROSS JOIN
generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS d
)
select * from final_results
select
split,
snapshot
from final_results
on conflict on constraint uniq_default_operator_split_snapshots do nothing;
`

func (r *RewardsCalculator) GenerateAndInsertDefaultOperatorSplitSnapshots(snapshotDate string) error {
tableName := "default_operator_split_snapshots"

query, err := rewardsUtils.RenderQueryTemplate(defaultOperatorSplitSnapshotQuery, map[string]interface{}{
"cutoffDate": snapshotDate,
})
if err != nil {
r.logger.Sugar().Errorw("Failed to render query template", "error", err)
r.logger.Sugar().Errorw("Failed to render query template", zap.Error(err))
return err
}

err = r.generateAndInsertFromQuery(tableName, query, nil)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate default_operator_split_snapshots", "error", err)
return err
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate default_operator_split_snapshots",
zap.String("snapshotDate", snapshotDate),
zap.Error(res.Error),
)
return res.Error
}
return nil
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/rewards/operatorAvsRegistrationSnapshots.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package rewards

import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
import (
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
"go.uber.org/zap"
)

// Operator AVS Registration Windows: Ranges at which an operator has registered for an AVS
// 0. Ranked: Rank the operator state changes by block_time and log_index since sqlite lacks LEAD/LAG functions
Expand All @@ -18,6 +21,7 @@ import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
// Entry Exit
// Since exits (deregistrations) are rounded down, we must only look at the day 2 snapshot on a pipeline run on day 3.
const operatorAvsRegistrationSnapshotsQuery = `
insert into operator_avs_registration_snapshots (operator, avs, snapshot)
WITH state_changes as (
select
aosc.*,
Expand Down Expand Up @@ -100,23 +104,25 @@ SELECT
d AS snapshot
FROM cleaned_records
CROSS JOIN generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS d
on conflict on constraint uniq_operator_avs_registration_snapshots do nothing;
`

func (r *RewardsCalculator) GenerateAndInsertOperatorAvsRegistrationSnapshots(snapshotDate string) error {
tableName := "operator_avs_registration_snapshots"

query, err := rewardsUtils.RenderQueryTemplate(operatorAvsRegistrationSnapshotsQuery, map[string]interface{}{
"cutoffDate": snapshotDate,
})
if err != nil {
r.logger.Sugar().Errorw("Failed to render operator AVS registration snapshots query", "error", err)
r.logger.Sugar().Errorw("Failed to render operator AVS registration snapshots query", zap.Error(err))
return err
}

err = r.generateAndInsertFromQuery(tableName, query, nil)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate operator_avs_registration_snapshots", "error", err)
return err
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate operator_avs_registration_snapshots",
zap.String("snapshotDate", snapshotDate),
zap.Error(res.Error),
)
return res.Error
}
return nil
}
Expand Down
25 changes: 18 additions & 7 deletions pkg/rewards/operatorAvsSplitSnapshots.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package rewards

import "github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
import (
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
"go.uber.org/zap"
)

const operatorAvsSplitSnapshotQuery = `
insert into operator_avs_split_snapshots (operator, avs, split, snapshot)
WITH operator_avs_splits_with_block_info as (
select
oas.operator,
Expand Down Expand Up @@ -62,12 +66,16 @@ final_results as (
CROSS JOIN
generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS d
)
select * from final_results
select
operator,
avs,
split,
snapshot
from final_results
on conflict on constraint uniq_operator_avs_split_snapshots do nothing
`

func (r *RewardsCalculator) GenerateAndInsertOperatorAvsSplitSnapshots(snapshotDate string) error {
tableName := "operator_avs_split_snapshots"

query, err := rewardsUtils.RenderQueryTemplate(operatorAvsSplitSnapshotQuery, map[string]interface{}{
"cutoffDate": snapshotDate,
})
Expand All @@ -76,9 +84,12 @@ func (r *RewardsCalculator) GenerateAndInsertOperatorAvsSplitSnapshots(snapshotD
return err
}

err = r.generateAndInsertFromQuery(tableName, query, nil)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate operator_avs_split_snapshots", "error", err)
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate operator_avs_split_snapshots",
zap.String("snapshotDate", snapshotDate),
zap.Error(err),
)
return err
}
return nil
Expand Down
Loading

0 comments on commit 4005a00

Please sign in to comment.