Skip to content

Commit cc6ffbc

Browse files
committed
Add Epoch Stake Thread
1 parent 3a9650f commit cc6ffbc

File tree

14 files changed

+222
-163
lines changed

14 files changed

+222
-163
lines changed

cardano-db-sync/cardano-db-sync.cabal

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ library
4545
Cardano.DbSync.Api
4646
Cardano.DbSync.Api.Ledger
4747
Cardano.DbSync.Api.Types
48+
Cardano.DbSync.Ledger.Async
4849
Cardano.DbSync.Config
4950
Cardano.DbSync.Config.Alonzo
5051
Cardano.DbSync.Config.Byron
@@ -130,6 +131,7 @@ library
130131
Cardano.DbSync.LocalStateQuery
131132
Cardano.DbSync.StateQuery
132133
Cardano.DbSync.Sync
134+
Cardano.DbSync.Threads
133135
Cardano.DbSync.Tracing.ToObjectOrphans
134136
Cardano.DbSync.Types
135137

cardano-db-sync/src/Cardano/DbSync.hs

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import Cardano.DbSync.Ledger.State
4141
import Cardano.DbSync.OffChain (runFetchOffChainPoolThread, runFetchOffChainVoteThread)
4242
import Cardano.DbSync.Rollback (unsafeRollback)
4343
import Cardano.DbSync.Sync (runSyncNodeClient)
44+
import Cardano.DbSync.Threads
4445
import Cardano.DbSync.Tracing.ToObjectOrphans ()
4546
import Cardano.DbSync.Types
4647
import Cardano.DbSync.Util.Constraint (queryIsJsonbInSchema)
@@ -206,6 +207,7 @@ runSyncNode metricsSetters trce iomgr dbConnString runMigrationFnc syncNodeConfi
206207
id
207208
[ runDbThread syncEnv metricsSetters threadChannels
208209
, runSyncNodeClient metricsSetters syncEnv iomgr trce threadChannels (enpSocketPath syncNodeParams)
210+
, runEpochStakeThread syncEnv
209211
, runFetchOffChainPoolThread syncEnv
210212
, runFetchOffChainVoteThread syncEnv
211213
, runLedgerStateWriteThread (getTrace syncEnv) (envLedgerEnv syncEnv)

cardano-db-sync/src/Cardano/DbSync/Cache/Types.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import Ouroboros.Consensus.Cardano.Block (StandardCrypto)
5252

5353
type StakePoolCache = Map PoolKeyHash DB.PoolHashId
5454

55-
-- | We use a stable cache for entries that are expected to be reused frequentyl.
55+
-- | We use a stable cache for entries that are expected to be reused frequently.
5656
-- These are stake addresses that have rewards, delegations etc.
5757
-- They are never removed unless manually eg when it's deregistered
5858
-- The LRU cache is much smaller for the rest stake addresses.

cardano-db-sync/src/Cardano/DbSync/Default.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
142142
isMember
143143
applyResult
144144

145-
-- Here we insert the block and it's txs, but in adition we also cache some values which we later
145+
-- Here we insert the block and it's txs, but in addition we also cache some values which we later
146146
-- use when updating the Epoch, thus saving us having to recalulating them later.
147147
case cblk of
148148
BlockByron blk ->

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Generic/StakeDist.hs

+31-98
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,22 @@ module Cardano.DbSync.Era.Shelley.Generic.StakeDist (
1313
StakeSliceRes (..),
1414
StakeSlice (..),
1515
getSecurityParameter,
16-
getStakeSlice,
16+
snapShotToList,
17+
getSnapShot,
1718
getPoolDistr,
1819
) where
1920

2021
import Cardano.DbSync.Types
2122
import Cardano.Ledger.Coin (Coin (..))
2223
import qualified Cardano.Ledger.Compactible as Ledger
23-
import Cardano.Ledger.Credential (Credential)
2424
import qualified Cardano.Ledger.EpochBoundary as Ledger
2525
import Cardano.Ledger.Era (EraCrypto)
26-
import Cardano.Ledger.Keys (KeyHash (..), KeyRole (..))
2726
import qualified Cardano.Ledger.Shelley.LedgerState as Shelley
2827
import Cardano.Ledger.Val ((<+>))
2928
import Cardano.Prelude
3029
import qualified Data.Map.Strict as Map
3130
import Data.VMap (VB, VMap (..), VP)
3231
import qualified Data.VMap as VMap
33-
import qualified Data.Vector.Generic as VG
3432
import Lens.Micro
3533
import Ouroboros.Consensus.Block
3634
import Ouroboros.Consensus.Cardano.Block (LedgerState (..), StandardCrypto)
@@ -53,9 +51,6 @@ data StakeSlice = StakeSlice
5351
}
5452
deriving (Eq)
5553

56-
emptySlice :: EpochNo -> StakeSlice
57-
emptySlice epoch = StakeSlice epoch Map.empty
58-
5954
getSecurityParameter ::
6055
ConsensusProtocol (BlockProtocol blk) =>
6156
ProtocolInfo blk ->
@@ -70,112 +65,50 @@ getSecurityParameter = maxRollbacks . configSecurityParam . pInfoConfig
7065
-- On mainnet, for a value minSliceSize = 2000, it will be used as the actual size of slices
7166
-- until the size of delegations grows up to 8.6M, in which case, the size of slices
7267
-- will be adjusted.
73-
getStakeSlice ::
74-
ConsensusProtocol (BlockProtocol blk) =>
75-
ProtocolInfo blk ->
76-
Word64 ->
68+
getSnapShot ::
7769
ExtLedgerState CardanoBlock ->
78-
Bool ->
79-
StakeSliceRes
80-
getStakeSlice pInfo !epochBlockNo els isMigration =
70+
Maybe (Ledger.SnapShot StandardCrypto, EpochNo)
71+
getSnapShot els =
8172
case ledgerState els of
82-
LedgerStateByron _ -> NoSlices
83-
LedgerStateShelley sls -> genericStakeSlice pInfo epochBlockNo sls isMigration
84-
LedgerStateAllegra als -> genericStakeSlice pInfo epochBlockNo als isMigration
85-
LedgerStateMary mls -> genericStakeSlice pInfo epochBlockNo mls isMigration
86-
LedgerStateAlonzo als -> genericStakeSlice pInfo epochBlockNo als isMigration
87-
LedgerStateBabbage bls -> genericStakeSlice pInfo epochBlockNo bls isMigration
88-
LedgerStateConway cls -> genericStakeSlice pInfo epochBlockNo cls isMigration
89-
90-
genericStakeSlice ::
91-
forall era c blk p.
92-
(c ~ StandardCrypto, EraCrypto era ~ c, ConsensusProtocol (BlockProtocol blk)) =>
93-
ProtocolInfo blk ->
94-
Word64 ->
73+
LedgerStateByron _ -> Nothing
74+
LedgerStateShelley sls -> Just $ genericSnapShot sls
75+
LedgerStateAllegra als -> Just $ genericSnapShot als
76+
LedgerStateMary mls -> Just $ genericSnapShot mls
77+
LedgerStateAlonzo als -> Just $ genericSnapShot als
78+
LedgerStateBabbage bls -> Just $ genericSnapShot bls
79+
LedgerStateConway cls -> Just $ genericSnapShot cls
80+
81+
genericSnapShot ::
82+
forall era p.
83+
(EraCrypto era ~ StandardCrypto) =>
9584
LedgerState (ShelleyBlock p era) ->
96-
Bool ->
97-
StakeSliceRes
98-
genericStakeSlice pInfo epochBlockNo lstate isMigration
99-
| index > delegationsLen = NoSlices
100-
| index == delegationsLen = Slice (emptySlice epoch) True
101-
| index + size > delegationsLen = Slice (mkSlice (delegationsLen - index)) True
102-
| otherwise = Slice (mkSlice size) False
85+
(Ledger.SnapShot StandardCrypto, EpochNo)
86+
genericSnapShot lstate = (stakeSnapshot, epoch)
10387
where
104-
epoch :: EpochNo
105-
epoch = EpochNo $ 1 + unEpochNo (Shelley.nesEL (Consensus.shelleyLedgerState lstate))
106-
107-
minSliceSize :: Word64
108-
minSliceSize = 2000
109-
110-
-- On mainnet this is 2160
111-
k :: Word64
112-
k = getSecurityParameter pInfo
113-
11488
-- We use 'ssStakeMark' here. That means that when these values
11589
-- are added to the database, the epoch number where they become active is the current
11690
-- epoch plus one.
117-
stakeSnapshot :: Ledger.SnapShot c
91+
stakeSnapshot :: Ledger.SnapShot StandardCrypto
11892
stakeSnapshot =
11993
Ledger.ssStakeMark . Shelley.esSnapshots . Shelley.nesEs $
12094
Consensus.shelleyLedgerState lstate
12195

122-
delegations :: VMap.KVVector VB VB (Credential 'Staking c, KeyHash 'StakePool c)
123-
delegations = VMap.unVMap $ Ledger.ssDelegations stakeSnapshot
124-
125-
delegationsLen :: Word64
126-
delegationsLen = fromIntegral $ VG.length delegations
96+
epoch = EpochNo $ 1 + unEpochNo (Shelley.nesEL (Consensus.shelleyLedgerState lstate))
12797

128-
stakes :: VMap VB VP (Credential 'Staking c) (Ledger.CompactForm Coin)
129-
stakes = Ledger.unStake $ Ledger.ssStake stakeSnapshot
98+
snapShotToList ::
99+
Ledger.SnapShot StandardCrypto ->
100+
[(StakeCred, (Coin, PoolKeyHash))]
101+
snapShotToList snapShot =
102+
VMap.toList $
103+
VMap.mapMaybe id $ -- This line removes entries without stake. Should we assume 0 and insert it?
104+
VMap.mapWithKey (\a p -> (,p) <$> lookupStake a) (Ledger.ssDelegations snapShot)
105+
where
106+
stakes :: VMap VB VP StakeCred (Ledger.CompactForm Coin)
107+
stakes = Ledger.unStake $ Ledger.ssStake snapShot
130108

131-
lookupStake :: Credential 'Staking c -> Maybe Coin
109+
lookupStake :: StakeCred -> Maybe Coin
132110
lookupStake cred = Ledger.fromCompact <$> VMap.lookup cred stakes
133111

134-
-- This is deterministic for the whole epoch and is the constant size of slices
135-
-- until the data are over. This means the last slice could be of smaller size and slices
136-
-- after that will be empty.
137-
epochSliceSize :: Word64
138-
epochSliceSize =
139-
max minSliceSize defaultEpochSliceSize
140-
where
141-
-- On mainnet this is 21600
142-
expectedBlocks :: Word64
143-
expectedBlocks = 10 * k
144-
145-
-- This size of slices is enough to cover the whole list, even if only
146-
-- the 20% of the expected blocks appear in an epoch.
147-
defaultEpochSliceSize :: Word64
148-
defaultEpochSliceSize = 1 + div (delegationsLen * 5) expectedBlocks
149-
150-
-- The starting index of the data in the delegation vector.
151-
index :: Word64
152-
index
153-
| isMigration = 0
154-
| epochBlockNo < k = delegationsLen + 1 -- so it creates the empty Slice.
155-
| otherwise = (epochBlockNo - k) * epochSliceSize
156-
157-
size :: Word64
158-
size
159-
| isMigration, epochBlockNo + 1 < k = 0
160-
| isMigration = (epochBlockNo + 1 - k) * epochSliceSize
161-
| otherwise = epochSliceSize
162-
163-
mkSlice :: Word64 -> StakeSlice
164-
mkSlice actualSize =
165-
StakeSlice
166-
{ sliceEpochNo = epoch
167-
, sliceDistr = distribution
168-
}
169-
where
170-
delegationsSliced :: VMap VB VB (Credential 'Staking c) (KeyHash 'StakePool c)
171-
delegationsSliced = VMap $ VG.slice (fromIntegral index) (fromIntegral actualSize) delegations
172-
173-
distribution :: Map StakeCred (Coin, PoolKeyHash)
174-
distribution =
175-
VMap.toMap $
176-
VMap.mapMaybe id $
177-
VMap.mapWithKey (\a p -> (,p) <$> lookupStake a) delegationsSliced
178-
179112
getPoolDistr ::
180113
ExtLedgerState CardanoBlock ->
181114
Maybe (Map PoolKeyHash (Coin, Word64), Map PoolKeyHash Natural)

cardano-db-sync/src/Cardano/DbSync/Era/Universal/Block.hs

-2
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,6 @@ insertBlockUniversal syncEnv shouldLog withinTwoMins withinHalfHour blk details
152152
whenStrictJust (apNewEpoch applyResult) $ \newEpoch -> do
153153
insertOnNewEpoch syncEnv blkId (Generic.blkSlotNo blk) epochNo newEpoch
154154

155-
insertStakeSlice syncEnv $ apStakeSlice applyResult
156-
157155
when (ioGov iopts && (withinHalfHour || unBlockNo (Generic.blkBlockNo blk) `mod` 10000 == 0))
158156
. lift
159157
$ insertOffChainVoteResults tracer (envOffChainVoteResultQueue syncEnv)

cardano-db-sync/src/Cardano/DbSync/Era/Universal/Epoch.hs

+8-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module Cardano.DbSync.Era.Universal.Epoch (
1515
insertRewards,
1616
hasNewEpochEvent,
1717
hasEpochStartEvent,
18+
insertEpochStake,
1819
insertRewardRests,
1920
insertProposalRefunds,
2021
insertPoolDepositRefunds,
@@ -200,7 +201,7 @@ insertStakeSlice ::
200201
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
201202
insertStakeSlice _ Generic.NoSlices = pure ()
202203
insertStakeSlice syncEnv (Generic.Slice slice finalSlice) = do
203-
insertEpochStake syncEnv network (Generic.sliceEpochNo slice) (Map.toList $ Generic.sliceDistr slice)
204+
insertEpochStake syncEnv (Generic.sliceEpochNo slice) (Map.toList $ Generic.sliceDistr slice)
204205
when finalSlice $ do
205206
lift $ DB.updateSetComplete $ unEpochNo $ Generic.sliceEpochNo slice
206207
size <- lift $ DB.queryEpochStakeCount (unEpochNo $ Generic.sliceEpochNo slice)
@@ -211,17 +212,14 @@ insertStakeSlice syncEnv (Generic.Slice slice finalSlice) = do
211212
tracer :: Trace IO Text
212213
tracer = getTrace syncEnv
213214

214-
network :: Network
215-
network = getNetwork syncEnv
216-
215+
-- This is used by the epoch stake thread.
217216
insertEpochStake ::
218217
(MonadBaseControl IO m, MonadIO m) =>
219218
SyncEnv ->
220-
Network ->
221219
EpochNo ->
222220
[(StakeCred, (Shelley.Coin, PoolKeyHash))] ->
223221
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
224-
insertEpochStake syncEnv nw epochNo stakeChunk = do
222+
insertEpochStake syncEnv epochNo stakeChunk = do
225223
let cache = envCache syncEnv
226224
DB.ManualDbConstraints {..} <- liftIO $ readTVarIO $ envDbConstraints syncEnv
227225
dbStakes <- mapM (mkStake cache) stakeChunk
@@ -235,8 +233,9 @@ insertEpochStake syncEnv nw epochNo stakeChunk = do
235233
(StakeCred, (Shelley.Coin, PoolKeyHash)) ->
236234
ExceptT SyncNodeError (ReaderT SqlBackend m) DB.EpochStake
237235
mkStake cache (saddr, (coin, pool)) = do
238-
saId <- lift $ queryOrInsertStakeAddress trce cache UpdateCacheStrong nw saddr
239-
poolId <- lift $ queryPoolKeyOrInsert "insertEpochStake" trce cache UpdateCache (ioShelley iopts) pool
236+
-- TODO check that not updating the cache here is not an issue.
237+
saId <- lift $ queryOrInsertStakeAddress trce cache DoNotUpdateCache network saddr
238+
poolId <- lift $ queryPoolKeyOrInsert "insertEpochStake" trce cache DoNotUpdateCache (ioShelley iopts) pool
240239
pure $
241240
DB.EpochStake
242241
{ DB.epochStakeAddrId = saId
@@ -247,6 +246,7 @@ insertEpochStake syncEnv nw epochNo stakeChunk = do
247246

248247
trce = getTrace syncEnv
249248
iopts = getInsertOptions syncEnv
249+
network = getNetwork syncEnv
250250

251251
insertRewards ::
252252
(MonadBaseControl IO m, MonadIO m) =>

cardano-db-sync/src/Cardano/DbSync/Error.hs

+3-7
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ module Cardano.DbSync.Error (
1010
annotateInvariantTx,
1111
bsBase16Encode,
1212
dbSyncNodeError,
13-
dbSyncInvariant,
1413
renderSyncInvariant,
1514
runOrThrowIO,
16-
fromEitherSTM,
15+
throwLeftIO,
1716
logAndThrowIO,
1817
shouldAbortOnPanic,
1918
hasAbortOnPanicEnv,
@@ -154,9 +153,6 @@ annotateInvariantTx tx ei =
154153
dbSyncNodeError :: (Monad m) => Text -> ExceptT SyncNodeError m a
155154
dbSyncNodeError = left . SNErrDefault
156155

157-
dbSyncInvariant :: (Monad m) => Text -> SyncInvariant -> ExceptT SyncNodeError m a
158-
dbSyncInvariant loc = left . SNErrInvariant loc
159-
160156
renderSyncInvariant :: SyncInvariant -> Text
161157
renderSyncInvariant ei =
162158
case ei of
@@ -174,8 +170,8 @@ renderSyncInvariant ei =
174170
, textShow tx
175171
]
176172

177-
fromEitherSTM :: (Exception e) => Either e a -> STM a
178-
fromEitherSTM = either throwSTM return
173+
throwLeftIO :: Exception e => Either e a -> IO a
174+
throwLeftIO = either throwIO pure
179175

180176
bsBase16Encode :: ByteString -> Text
181177
bsBase16Encode bs =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
{-# LANGUAGE OverloadedStrings #-}
2+
3+
module Cardano.DbSync.Ledger.Async where
4+
5+
import Cardano.DbSync.Ledger.Types
6+
import Cardano.Ledger.BaseTypes (EpochNo)
7+
import Cardano.Ledger.Crypto (StandardCrypto)
8+
import qualified Cardano.Ledger.EpochBoundary as Ledger
9+
import Control.Concurrent.Class.MonadSTM.Strict
10+
import qualified Control.Concurrent.STM.TBQueue as TBQ
11+
12+
newEpochStakeChannels :: IO EpochStakeChannels
13+
newEpochStakeChannels =
14+
-- This may never be more than 1. But let's keep it a queue for extensibility shake.
15+
-- This may allow us to parallelize the events workload even further
16+
EpochStakeChannels
17+
<$> TBQ.newTBQueueIO 1
18+
<*> newTVarIO Nothing
19+
20+
-- To be used by the main thread
21+
ensureEpochDone :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot StandardCrypto -> IO ()
22+
ensureEpochDone sQueue epoch snapshot = atomically $ do
23+
mLastEpochDone <- waitFinished sQueue
24+
case mLastEpochDone of
25+
Just lastEpochDone | lastEpochDone == epoch -> pure ()
26+
_ -> do
27+
-- If last is not already there, put it to list and wait again
28+
writeStakeAction sQueue epoch snapshot True
29+
retry
30+
31+
-- To be used by the main thread
32+
waitFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo)
33+
waitFinished sQueue = do
34+
stakeThreadState <- readTVar (epochResult sQueue)
35+
case stakeThreadState of
36+
Just (lastEpoch, Done) -> pure $ Just lastEpoch -- Normal case
37+
Just (_, Running) -> retry -- Wait to finish current work.
38+
Nothing -> pure Nothing -- This will happen after a restart
39+
40+
-- To be used by the main thread
41+
writeStakeAction :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot StandardCrypto -> Bool -> STM IO ()
42+
writeStakeAction sQueue epoch snapShot checkFirst = do
43+
TBQ.writeTBQueue (estakeQueue sQueue) $ EpochStakeDBAction epoch snapShot checkFirst
44+
writeTVar (epochResult sQueue) $ Just (epoch, Running)

0 commit comments

Comments
 (0)