Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsavorite kernel #1

Open
wants to merge 42 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a18fa62
WIP on Kernel
TedHartMS Aug 10, 2024
1bc6227
merge and resolve 'main'
TedHartMS Aug 10, 2024
d92bbfe
move SplitChunks to HashTable
TedHartMS Aug 16, 2024
94ccb33
Merge remote-tracking branch 'origin/main' into tsavorite-kernel
TedHartMS Aug 16, 2024
1e3feba
Merge remote-tracking branch 'origin/main' into tsavorite-kernel
TedHartMS Aug 20, 2024
2a3f19e
WIP on Kernel separation
TedHartMS Aug 26, 2024
b82aa06
merge 'main'
TedHartMS Aug 27, 2024
073d5db
WIP on kernalization
TedHartMS Aug 31, 2024
8b92b77
manual locking fix
TedHartMS Aug 31, 2024
5bddc77
Remove SessionFunctionsWrapper from State machines - it's no longer n…
TedHartMS Aug 31, 2024
317e102
Clean up existing ISessionLocker and related to be HashEntryInfo-based
TedHartMS Sep 4, 2024
15c8f66
More tweaks to ISessionLocker
TedHartMS Sep 4, 2024
b64862d
Rename to ExecutionContext
TedHartMS Sep 4, 2024
1fabdf2
merge origin/main, part 1
TedHartMS Sep 26, 2024
3766e6f
merge origin/main
TedHartMS Sep 26, 2024
52e44f6
Some cleanup
TedHartMS Sep 27, 2024
c3b84f0
merge origin/main
TedHartMS Sep 30, 2024
adc9e09
WIP DualContext in Processing layer
TedHartMS Sep 30, 2024
5941b1d
WIP dualContext
TedHartMS Oct 1, 2024
05d2bdc
DualSession in TTL
TedHartMS Oct 7, 2024
70aebc2
Break out ContextOperation.cs and CheckpointOperations.cs from Tsavor…
TedHartMS Oct 7, 2024
a2e9cf3
WIP on <TKeyLocker, TEpochGuard> conversion
TedHartMS Oct 16, 2024
545823c
WIP on DualContext
TedHartMS Oct 23, 2024
5c640b0
More DualContext WIP
TedHartMS Oct 26, 2024
608ac09
more dual WIP
TedHartMS Oct 30, 2024
30a93d6
Dual WIP
TedHartMS Oct 31, 2024
6f92205
WIP dual - update IGarnetInterface pt 1
TedHartMS Nov 1, 2024
963670b
WIP Dual
TedHartMS Nov 2, 2024
1adc787
WIP on Dual
TedHartMS Nov 3, 2024
016c03d
Dual WIP
TedHartMS Nov 4, 2024
4b9ab49
WIP Dual
TedHartMS Nov 4, 2024
03fb520
Dual WIP
TedHartMS Nov 5, 2024
1179422
WIP Dual -- GarnetApi
TedHartMS Nov 6, 2024
482d017
WIP dual
TedHartMS Nov 6, 2024
4aad2eb
WIP Dual (IKeyLocker rename)
TedHartMS Nov 6, 2024
7177d0b
WIP Dual (ResetModified, etc.)
TedHartMS Nov 7, 2024
835b3bf
WIP Dual (Transient*Lock param tweaks)
TedHartMS Nov 7, 2024
fbe2cf5
WIP Dual (first build)
TedHartMS Nov 8, 2024
0352691
WIP Dual (first test runs)
TedHartMS Nov 9, 2024
6b9ea53
WIP dual (fixes for stackCtx)
TedHartMS Nov 13, 2024
d9fa4de
WIP dual: fix some tests
TedHartMS Nov 14, 2024
8cfa79e
WIP Dual (minor cleanup)
TedHartMS Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>,
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>>;
using BasicGarnetApi = GarnetApi<TransientKeyLocker, GarnetSafeEpochGuard>;

/// <summary>
/// Cluster manager
Expand Down
19 changes: 7 additions & 12 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>,
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>>;
using BasicGarnetApi = GarnetApi<TransientKeyLocker, GarnetSafeEpochGuard>;

/// <summary>
/// Cluster provider
Expand Down Expand Up @@ -142,8 +137,8 @@ public void FlushConfig()
/// <inheritdoc />
public void FlushDB(bool unsafeTruncateLog = false)
{
storeWrapper.store.Log.ShiftBeginAddress(storeWrapper.store.Log.TailAddress, truncateLog: unsafeTruncateLog);
storeWrapper.objectStore?.Log.ShiftBeginAddress(storeWrapper.objectStore.Log.TailAddress, truncateLog: unsafeTruncateLog);
storeWrapper.Store.Log.ShiftBeginAddress(storeWrapper.Store.Log.TailAddress, truncateLog: unsafeTruncateLog);
storeWrapper.ObjectStore?.Log.ShiftBeginAddress(storeWrapper.ObjectStore.Log.TailAddress, truncateLog: unsafeTruncateLog);
}

/// <inheritdoc />
Expand All @@ -153,7 +148,7 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover

if (storeType is StoreType.Main or StoreType.All)
{
entry.storeVersion = storeWrapper.store.CurrentVersion;
entry.storeVersion = storeWrapper.Store.CurrentVersion;
entry.storeHlogToken = storeCheckpointToken;
entry.storeIndexToken = storeCheckpointToken;
entry.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
Expand All @@ -162,7 +157,7 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover

if (storeType is StoreType.Object or StoreType.All)
{
entry.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.ObjectStore.CurrentVersion;
entry.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
Expand Down Expand Up @@ -290,8 +285,8 @@ internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(Stor
Debug.Assert(serverOptions.EnableCluster);
return storeType switch
{
StoreType.Main => (ReplicationLogCheckpointManager)storeWrapper.store.CheckpointManager,
StoreType.Object => (ReplicationLogCheckpointManager)storeWrapper.objectStore?.CheckpointManager,
StoreType.Main => (ReplicationLogCheckpointManager)storeWrapper.Store.CheckpointManager,
StoreType.Object => (ReplicationLogCheckpointManager)storeWrapper.ObjectStore?.CheckpointManager,
_ => throw new Exception($"GetCkptManager: unexpected state {storeType}")
};
}
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/Migration/MigrateSessionKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,14 @@ public bool MigrateKeys()
return false;

// Migrate main store keys
_gcs.InitMigrateBuffer(clusterProvider.storeWrapper.loggingFrequncy);
_gcs.InitMigrateBuffer(clusterProvider.storeWrapper.loggingFrequency);
if (!MigrateKeysFromMainStore())
return false;

// Migrate object store keys
if (!clusterProvider.serverOptions.DisableObjects)
{
_gcs.InitMigrateBuffer(clusterProvider.storeWrapper.loggingFrequncy);
_gcs.InitMigrateBuffer(clusterProvider.storeWrapper.loggingFrequency);
if (!MigrateKeysFromObjectStore())
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/Migration/MigrateSessionSlots.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal sealed unsafe partial class MigrateSession : IDisposable
public bool MigrateSlotsDriver()
{
logger?.LogTrace("Initializing MainStore Iterator");
var storeTailAddress = clusterProvider.storeWrapper.store.Log.TailAddress;
var storeTailAddress = clusterProvider.storeWrapper.Store.Log.TailAddress;
var bufferSize = 1 << clusterProvider.serverOptions.PageSizeBits();
MigrationKeyIterationFunctions.MainStoreGetKeysInSlots mainStoreGetKeysInSlots = new(this, _sslots, bufferSize: bufferSize);

Expand Down Expand Up @@ -55,7 +55,7 @@ public bool MigrateSlotsDriver()
if (!clusterProvider.serverOptions.DisableObjects)
{
logger?.LogTrace("Initializing ObjectStore Iterator");
var objectStoreTailAddress = clusterProvider.storeWrapper.objectStore.Log.TailAddress;
var objectStoreTailAddress = clusterProvider.storeWrapper.ObjectStore.Log.TailAddress;
var objectBufferSize = 1 << clusterProvider.serverOptions.ObjectStorePageSizeBits();
MigrationKeyIterationFunctions.ObjectStoreGetKeysInSlots objectStoreGetKeysInSlots = new(this, _sslots, bufferSize: objectBufferSize);

Expand Down
8 changes: 4 additions & 4 deletions libs/cluster/Server/Replication/CheckpointStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,20 +272,20 @@ public CheckpointEntry GetLatestCheckpointEntryFromDisk()
{
Guid objectStoreHLogToken = default;
Guid objectStoreIndexToken = default;
storeWrapper.store.GetLatestCheckpointTokens(out var storeHLogToken, out var storeIndexToken);
storeWrapper.objectStore?.GetLatestCheckpointTokens(out objectStoreHLogToken, out objectStoreIndexToken);
storeWrapper.Store.GetLatestCheckpointTokens(out var storeHLogToken, out var storeIndexToken);
storeWrapper.ObjectStore?.GetLatestCheckpointTokens(out objectStoreHLogToken, out objectStoreIndexToken);
var (storeCheckpointCoveredAofAddress, storePrimaryReplId) = GetCheckpointCookieMetadata(StoreType.Main, storeHLogToken);
var (objectCheckpointCoveredAofAddress, objectStorePrimaryReplId) = objectStoreHLogToken == default ? (long.MaxValue, null) : GetCheckpointCookieMetadata(StoreType.Object, objectStoreHLogToken);

CheckpointEntry entry = new()
{
storeVersion = storeHLogToken == default ? -1 : storeWrapper.store.GetLatestCheckpointVersion(),
storeVersion = storeHLogToken == default ? -1 : storeWrapper.Store.GetLatestCheckpointVersion(),
storeHlogToken = storeHLogToken,
storeIndexToken = storeIndexToken,
storeCheckpointCoveredAofAddress = storeCheckpointCoveredAofAddress,
storePrimaryReplId = storePrimaryReplId,

objectStoreVersion = objectStoreHLogToken == default ? -1 : storeWrapper.objectStore.GetLatestCheckpointVersion(),
objectStoreVersion = objectStoreHLogToken == default ? -1 : storeWrapper.ObjectStore.GetLatestCheckpointVersion(),
objectStoreHlogToken = objectStoreHLogToken,
objectStoreIndexToken = objectStoreIndexToken,
objectCheckpointCoveredAofAddress = objectCheckpointCoveredAofAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public bool TryAcquireSettledMetadataForMainStore(CheckpointEntry entry, out Log
index_size = -1;
try
{
hlog_size = storeWrapper.store.GetLogFileSize(entry.storeHlogToken);
index_size = storeWrapper.store.GetIndexFileSize(entry.storeIndexToken);
hlog_size = storeWrapper.Store.GetLogFileSize(entry.storeHlogToken);
index_size = storeWrapper.Store.GetIndexFileSize(entry.storeIndexToken);
return true;
}
catch
Expand All @@ -54,8 +54,8 @@ public bool TryAcquireSettledMetadataForObjectStore(CheckpointEntry entry, out L
index_size = -1;
try
{
hlog_size = storeWrapper.objectStore.GetLogFileSize(entry.objectStoreHlogToken);
index_size = storeWrapper.objectStore.GetIndexFileSize(entry.objectStoreIndexToken);
hlog_size = storeWrapper.ObjectStore.GetLogFileSize(entry.objectStoreHlogToken);
index_size = storeWrapper.ObjectStore.GetIndexFileSize(entry.objectStoreIndexToken);
return true;
}
catch
Expand Down
6 changes: 3 additions & 3 deletions libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public ReplicationLogCheckpointManager GetCkptManager(StoreType storeType)
{
return storeType switch
{
StoreType.Main => (ReplicationLogCheckpointManager)storeWrapper.store.CheckpointManager,
StoreType.Object => (ReplicationLogCheckpointManager)storeWrapper.objectStore?.CheckpointManager,
StoreType.Main => (ReplicationLogCheckpointManager)storeWrapper.Store.CheckpointManager,
StoreType.Object => (ReplicationLogCheckpointManager)storeWrapper.ObjectStore?.CheckpointManager,
_ => throw new Exception($"GetCkptManager: unexpected state {storeType}")
};
}
Expand Down Expand Up @@ -103,7 +103,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null

// Set the appendOnlyFile field for all stores
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).checkpointVersionShift = CheckpointVersionShift;
if (storeWrapper.objectStore != null)
if (storeWrapper.ObjectStore != null)
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShift = CheckpointVersionShift;

// If this node starts as replica, it cannot serve requests until it is connected to primary
Expand Down
10 changes: 3 additions & 7 deletions libs/cluster/Session/ClusterSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>,
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>>;
using BasicGarnetApi = GarnetApi<TransientKeyLocker, GarnetSafeEpochGuard>;

internal sealed unsafe partial class ClusterSession : IClusterSession
{
Expand Down Expand Up @@ -50,7 +45,8 @@ internal sealed unsafe partial class ClusterSession : IClusterSession
public void SetReadOnlySession() => readWriteSession = false;
public void SetReadWriteSession() => readWriteSession = true;

public ClusterSession(ClusterProvider clusterProvider, TransactionManager txnManager, IGarnetAuthenticator authenticator, User user, GarnetSessionMetrics sessionMetrics, BasicGarnetApi basicGarnetApi, INetworkSender networkSender, ILogger logger = null)
public ClusterSession(ClusterProvider clusterProvider, TransactionManager txnManager, IGarnetAuthenticator authenticator, User user, GarnetSessionMetrics sessionMetrics,
BasicGarnetApi basicGarnetApi, INetworkSender networkSender, ILogger logger = null)
{
this.clusterProvider = clusterProvider;
this.authenticator = authenticator;
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Session/RespClusterMigrateCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private void TrackImportProgress(int keyCount, bool isMainStore, bool completed
{
totalKeyCount += keyCount;
var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
if (completed || lastLog == 0 || duration >= clusterProvider.storeWrapper.loggingFrequncy)
if (completed || lastLog == 0 || duration >= clusterProvider.storeWrapper.loggingFrequency)
{
logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount})", completed ? "COMPLETED" : "IMPORTING", isMainStore, totalKeyCount.ToString("N0"));
lastLog = Stopwatch.GetTimestamp();
Expand Down
Loading