Skip to content

Commit bf9ae40

Browse files
committed
Add Stake thread
1 parent cc6ffbc commit bf9ae40

29 files changed

+424
-368
lines changed

cardano-db-sync/cardano-db-sync.cabal

+2
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ library
114114
Cardano.DbSync.Cache.Epoch
115115
Cardano.DbSync.Cache.FIFO
116116
Cardano.DbSync.Cache.LRU
117+
Cardano.DbSync.Cache.Stake
117118
Cardano.DbSync.Cache.Types
119+
Cardano.DbSync.Cache.Util
118120
Cardano.DbSync.Default
119121
Cardano.DbSync.Epoch
120122

cardano-db-sync/src/Cardano/DbSync.hs

+3-2
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,6 @@ runSyncNode metricsSetters trce iomgr dbConnString runMigrationFnc syncNodeConfi
164164
logInfo trce $ "Using shelley genesis file from: " <> (show . unGenesisFile $ dncShelleyGenesisFile syncNodeConfigFromFile)
165165
logInfo trce $ "Using alonzo genesis file from: " <> (show . unGenesisFile $ dncAlonzoGenesisFile syncNodeConfigFromFile)
166166

167-
let useLedger = shouldUseLedger (sioLedger $ dncInsertOptions syncNodeConfigFromFile)
168-
169167
Db.runIohkLogging trce $
170168
withPostgresqlConn dbConnString $
171169
\backend -> liftIO $ do
@@ -208,6 +206,7 @@ runSyncNode metricsSetters trce iomgr dbConnString runMigrationFnc syncNodeConfi
208206
[ runDbThread syncEnv metricsSetters threadChannels
209207
, runSyncNodeClient metricsSetters syncEnv iomgr trce threadChannels (enpSocketPath syncNodeParams)
210208
, runEpochStakeThread syncEnv
209+
, runStakeThread syncEnv
211210
, runFetchOffChainPoolThread syncEnv
212211
, runFetchOffChainVoteThread syncEnv
213212
, runLedgerStateWriteThread (getTrace syncEnv) (envLedgerEnv syncEnv)
@@ -221,6 +220,8 @@ runSyncNode metricsSetters trce iomgr dbConnString runMigrationFnc syncNodeConfi
221220
removeJsonbFromSchemaConfig = ioRemoveJsonbFromSchema $ soptInsertOptions syncOptions
222221
maybeLedgerDir = enpMaybeLedgerStateDir syncNodeParams
223222

223+
useLedger = shouldUseLedger (sioLedger $ dncInsertOptions syncNodeConfigFromFile)
224+
224225
logProtocolMagicId :: Trace IO Text -> Crypto.ProtocolMagicId -> ExceptT SyncNodeError IO ()
225226
logProtocolMagicId tracer pm =
226227
liftIO

cardano-db-sync/src/Cardano/DbSync/Api.hs

+8-5
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ import qualified Cardano.Chain.Genesis as Byron
5151
import Cardano.Crypto.ProtocolMagic (ProtocolMagicId (..))
5252
import qualified Cardano.Db as DB
5353
import Cardano.DbSync.Api.Types
54-
import Cardano.DbSync.Cache.Types (CacheCapacity (..), newEmptyCache, useNoCache)
54+
import Cardano.DbSync.Cache.Types (CacheCapacity (..), newEmptyCache, newStakeChannels, useNoCache)
5555
import Cardano.DbSync.Config.Cardano
5656
import Cardano.DbSync.Config.Shelley
5757
import Cardano.DbSync.Config.Types
@@ -176,11 +176,12 @@ getSafeBlockNoDiff syncEnv = 2 * getSecurityParam syncEnv
176176
getPruneInterval :: SyncEnv -> Word64
177177
getPruneInterval syncEnv = 10 * getSecurityParam syncEnv
178178

179-
whenConsumeOrPruneTxOut :: (MonadIO m) => SyncEnv -> m () -> m ()
180-
whenConsumeOrPruneTxOut env =
181-
when (DB.pcmConsumedTxOut $ getPruneConsume env)
179+
whenConsumeOrPruneTxOut :: MonadIO m => SyncEnv -> m () -> m ()
180+
whenConsumeOrPruneTxOut syncEnv action = do
181+
disInOut <- liftIO $ getDisableInOutState syncEnv
182+
when (not disInOut && DB.pcmConsumedTxOut (getPruneConsume syncEnv)) action
182183

183-
whenPruneTxOut :: (MonadIO m) => SyncEnv -> m () -> m ()
184+
whenPruneTxOut :: MonadIO m => SyncEnv -> m () -> m ()
184185
whenPruneTxOut env =
185186
when (DB.pcmPruneTxOut $ getPruneConsume env)
186187

@@ -339,6 +340,7 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
339340
bts <- getBootstrapInProgress trce (isTxOutConsumedBootstrap' syncNodeConfigFromFile) backend
340341
bootstrapVar <- newTVarIO bts
341342
-- Offline Pool + Anchor queues
343+
cChans <- newStakeChannels
342344
opwq <- newTBQueueIO 1000
343345
oprq <- newTBQueueIO 1000
344346
oawq <- newTBQueueIO 1000
@@ -378,6 +380,7 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
378380
, envIndexes = indexesVar
379381
, envLedgerEnv = ledgerEnvType
380382
, envNetworkMagic = nwMagic
383+
, envStakeChans = cChans
381384
, envOffChainPoolResultQueue = oprq
382385
, envOffChainPoolWorkQueue = opwq
383386
, envOffChainVoteResultQueue = oarq

cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs

+1-2
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,7 @@ prepareTxOut syncEnv (TxIn txIntxId (TxIx index), txOut) = do
171171
let txHashByteString = Generic.safeHashToByteString $ unTxId txIntxId
172172
let genTxOut = fromTxOut index txOut
173173
txId <- liftLookupFail "prepareTxOut" $ queryTxIdWithCache cache txIntxId
174-
insertTxOut trce cache iopts (txId, txHashByteString) genTxOut
174+
insertTxOut syncEnv iopts (txId, txHashByteString) genTxOut
175175
where
176-
trce = getTrace syncEnv
177176
cache = envCache syncEnv
178177
iopts = soptInsertOptions $ envOptions syncEnv

cardano-db-sync/src/Cardano/DbSync/Api/Types.hs

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ module Cardano.DbSync.Api.Types (
1313
) where
1414

1515
import qualified Cardano.Db as DB
16-
import Cardano.DbSync.Cache.Types (CacheStatus)
16+
import Cardano.DbSync.Cache.Types (CacheStatus, StakeChannels)
1717
import Cardano.DbSync.Config.Types (SyncNodeConfig)
1818
import Cardano.DbSync.Ledger.Types (HasLedgerEnv)
1919
import Cardano.DbSync.LocalStateQuery (NoLedgerEnv)
@@ -48,6 +48,7 @@ data SyncEnv = SyncEnv
4848
, envBootstrap :: !(StrictTVar IO Bool)
4949
, envLedgerEnv :: !LedgerEnv
5050
, envNetworkMagic :: !NetworkMagic
51+
, envStakeChans :: !StakeChannels
5152
, envOffChainPoolResultQueue :: !(StrictTBQueue IO OffChainPoolResult)
5253
, envOffChainPoolWorkQueue :: !(StrictTBQueue IO OffChainPoolWorkQueue)
5354
, envOffChainVoteResultQueue :: !(StrictTBQueue IO OffChainVoteResult)

cardano-db-sync/src/Cardano/DbSync/Cache.hs

+10-138
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,32 @@ module Cardano.DbSync.Cache (
1616
queryPoolKeyOrInsert,
1717
queryPoolKeyWithCache,
1818
queryPrevBlockWithCache,
19-
queryOrInsertStakeAddress,
20-
queryOrInsertRewardAccount,
2119
insertAddressUsingCache,
22-
insertStakeAddress,
23-
queryStakeAddrWithCache,
2420
queryTxIdWithCache,
2521
rollbackCache,
2622
optimiseCaches,
2723
tryUpdateCacheTx,
2824

2925
-- * CacheStatistics
3026
getCacheStatistics,
27+
module X,
3128
) where
3229

3330
import Cardano.BM.Trace
3431
import qualified Cardano.Db as DB
3532
import qualified Cardano.Db.Schema.Variant.TxOut as V
33+
import Cardano.DbSync.Api
34+
import Cardano.DbSync.Api.Types
3635
import Cardano.DbSync.Cache.Epoch (rollbackMapEpochInCache)
3736
import qualified Cardano.DbSync.Cache.FIFO as FIFO
3837
import qualified Cardano.DbSync.Cache.LRU as LRU
38+
import Cardano.DbSync.Cache.Stake as X
3939
import Cardano.DbSync.Cache.Types (CacheAction (..), CacheInternal (..), CacheStatistics (..), CacheStatus (..), StakeCache (..), initCacheStatistics, shouldCache)
40+
import Cardano.DbSync.Cache.Util
4041
import qualified Cardano.DbSync.Era.Shelley.Generic.Util as Generic
41-
import Cardano.DbSync.Era.Shelley.Query
4242
import Cardano.DbSync.Era.Util
4343
import Cardano.DbSync.Error
4444
import Cardano.DbSync.Types
45-
import qualified Cardano.Ledger.Address as Ledger
46-
import Cardano.Ledger.BaseTypes (Network)
4745
import Cardano.Ledger.Mary.Value
4846
import qualified Cardano.Ledger.TxIn as Ledger
4947
import Cardano.Prelude
@@ -109,113 +107,6 @@ getCacheStatistics cs =
109107
NoCache -> pure initCacheStatistics
110108
ActiveCache ci -> readTVarIO (cStats ci)
111109

112-
queryOrInsertRewardAccount ::
113-
(MonadBaseControl IO m, MonadIO m) =>
114-
Trace IO Text ->
115-
CacheStatus ->
116-
CacheAction ->
117-
Ledger.RewardAccount StandardCrypto ->
118-
ReaderT SqlBackend m DB.StakeAddressId
119-
queryOrInsertRewardAccount trce cache cacheUA rewardAddr = do
120-
eiAddrId <- queryStakeAddrWithCacheRetBs trce cache cacheUA rewardAddr
121-
case eiAddrId of
122-
Left (_err, bs) -> insertStakeAddress rewardAddr (Just bs)
123-
Right addrId -> pure addrId
124-
125-
queryOrInsertStakeAddress ::
126-
(MonadBaseControl IO m, MonadIO m) =>
127-
Trace IO Text ->
128-
CacheStatus ->
129-
CacheAction ->
130-
Network ->
131-
StakeCred ->
132-
ReaderT SqlBackend m DB.StakeAddressId
133-
queryOrInsertStakeAddress trce cache cacheUA nw cred =
134-
queryOrInsertRewardAccount trce cache cacheUA $ Ledger.RewardAccount nw cred
135-
136-
-- If the address already exists in the table, it will not be inserted again (due to
137-
-- the uniqueness constraint) but the function will return the 'StakeAddressId'.
138-
insertStakeAddress ::
139-
(MonadBaseControl IO m, MonadIO m) =>
140-
Ledger.RewardAccount StandardCrypto ->
141-
Maybe ByteString ->
142-
ReaderT SqlBackend m DB.StakeAddressId
143-
insertStakeAddress rewardAddr stakeCredBs = do
144-
DB.insertStakeAddress $
145-
DB.StakeAddress
146-
{ DB.stakeAddressHashRaw = addrBs
147-
, DB.stakeAddressView = Generic.renderRewardAccount rewardAddr
148-
, DB.stakeAddressScriptHash = Generic.getCredentialScriptHash $ Ledger.raCredential rewardAddr
149-
}
150-
where
151-
addrBs = fromMaybe (Ledger.serialiseRewardAccount rewardAddr) stakeCredBs
152-
153-
queryStakeAddrWithCache ::
154-
forall m.
155-
MonadIO m =>
156-
Trace IO Text ->
157-
CacheStatus ->
158-
CacheAction ->
159-
Network ->
160-
StakeCred ->
161-
ReaderT SqlBackend m (Either DB.LookupFail DB.StakeAddressId)
162-
queryStakeAddrWithCache trce cache cacheUA nw cred =
163-
mapLeft fst <$> queryStakeAddrWithCacheRetBs trce cache cacheUA (Ledger.RewardAccount nw cred)
164-
165-
queryStakeAddrWithCacheRetBs ::
166-
forall m.
167-
MonadIO m =>
168-
Trace IO Text ->
169-
CacheStatus ->
170-
CacheAction ->
171-
Ledger.RewardAccount StandardCrypto ->
172-
ReaderT SqlBackend m (Either (DB.LookupFail, ByteString) DB.StakeAddressId)
173-
queryStakeAddrWithCacheRetBs _trce cache cacheUA ra@(Ledger.RewardAccount _ cred) = do
174-
let bs = Ledger.serialiseRewardAccount ra
175-
case cache of
176-
NoCache -> rsStkAdrrs bs
177-
ActiveCache ci -> do
178-
withCacheOptimisationCheck ci (rsStkAdrrs bs) $ do
179-
stakeCache <- liftIO $ readTVarIO (cStake ci)
180-
case queryStakeCache cred stakeCache of
181-
Just (addrId, stakeCache') -> do
182-
liftIO $ hitCreds (cStats ci)
183-
case cacheUA of
184-
EvictAndUpdateCache -> do
185-
liftIO $ atomically $ writeTVar (cStake ci) $ deleteStakeCache cred stakeCache'
186-
pure $ Right addrId
187-
_other -> do
188-
liftIO $ atomically $ writeTVar (cStake ci) stakeCache'
189-
pure $ Right addrId
190-
Nothing -> do
191-
queryRes <- mapLeft (,bs) <$> resolveStakeAddress bs
192-
liftIO $ missCreds (cStats ci)
193-
case queryRes of
194-
Left _ -> pure queryRes
195-
Right stakeAddrsId -> do
196-
let !stakeCache' = case cacheUA of
197-
UpdateCache -> stakeCache {scLruCache = LRU.insert cred stakeAddrsId (scLruCache stakeCache)}
198-
UpdateCacheStrong -> stakeCache {scStableCache = Map.insert cred stakeAddrsId (scStableCache stakeCache)}
199-
_otherwise -> stakeCache
200-
liftIO $
201-
atomically $
202-
writeTVar (cStake ci) stakeCache'
203-
pure $ Right stakeAddrsId
204-
where
205-
rsStkAdrrs bs = mapLeft (,bs) <$> resolveStakeAddress bs
206-
207-
-- | True if it was found in LRU
208-
queryStakeCache :: StakeCred -> StakeCache -> Maybe (DB.StakeAddressId, StakeCache)
209-
queryStakeCache scred scache = case Map.lookup scred (scStableCache scache) of
210-
Just addrId -> Just (addrId, scache)
211-
Nothing -> case LRU.lookup scred (scLruCache scache) of
212-
Just (addrId, lru') -> Just (addrId, scache {scLruCache = lru'})
213-
Nothing -> Nothing
214-
215-
deleteStakeCache :: StakeCred -> StakeCache -> StakeCache
216-
deleteStakeCache scred scache =
217-
scache {scStableCache = Map.delete scred (scStableCache scache)}
218-
219110
queryPoolKeyWithCache ::
220111
MonadIO m =>
221112
CacheStatus ->
@@ -352,14 +243,13 @@ insertPoolKeyWithCache cache cacheUA pHash =
352243

353244
queryPoolKeyOrInsert ::
354245
(MonadBaseControl IO m, MonadIO m) =>
246+
SyncEnv ->
355247
Text ->
356-
Trace IO Text ->
357-
CacheStatus ->
358248
CacheAction ->
359249
Bool ->
360250
PoolKeyHash ->
361251
ReaderT SqlBackend m DB.PoolHashId
362-
queryPoolKeyOrInsert txt trce cache cacheUA logsWarning hsh = do
252+
queryPoolKeyOrInsert syncEnv txt cacheUA logsWarning hsh = do
363253
pk <- queryPoolKeyWithCache cache cacheUA hsh
364254
case pk of
365255
Right poolHashId -> pure poolHashId
@@ -377,6 +267,9 @@ queryPoolKeyOrInsert txt trce cache cacheUA logsWarning hsh = do
377267
, ". We will assume that the pool exists and move on."
378268
]
379269
insertPoolKeyWithCache cache cacheUA hsh
270+
where
271+
trce = getTrace syncEnv
272+
cache = envCache syncEnv
380273

381274
queryMAWithCache ::
382275
MonadIO m =>
@@ -544,27 +437,6 @@ insertDatumAndCache cache hsh dt = do
544437
LRU.insert hsh datumId
545438
pure datumId
546439

547-
withCacheOptimisationCheck ::
548-
MonadIO m =>
549-
CacheInternal ->
550-
m a -> -- Action to perform if cache is optimised
551-
m a -> -- Action to perform if cache is not optimised
552-
m a
553-
withCacheOptimisationCheck ci ifOptimised ifNotOptimised = do
554-
isCachedOptimised <- liftIO $ readTVarIO (cIsCacheOptimised ci)
555-
if isCachedOptimised
556-
then ifOptimised
557-
else ifNotOptimised
558-
559-
-- Stakes
560-
hitCreds :: StrictTVar IO CacheStatistics -> IO ()
561-
hitCreds ref =
562-
atomically $ modifyTVar ref (\cs -> cs {credsHits = 1 + credsHits cs, credsQueries = 1 + credsQueries cs})
563-
564-
missCreds :: StrictTVar IO CacheStatistics -> IO ()
565-
missCreds ref =
566-
atomically $ modifyTVar ref (\cs -> cs {credsQueries = 1 + credsQueries cs})
567-
568440
-- Pools
569441
hitPools :: StrictTVar IO CacheStatistics -> IO ()
570442
hitPools ref =

0 commit comments

Comments
 (0)