Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Sep 30, 2024
1 parent da402dc commit f29ce5f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 31 deletions.
11 changes: 5 additions & 6 deletions waku/v2/api/history/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

const defaultBackoff = 10 * time.Second
const graylistBackoff = 3 * time.Minute
const storenodeVerificationInterval = time.Second
const storenodeMaxFailedRequests uint = 2
const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
const findNearestMailServer = !isAndroidEmulator
Expand Down Expand Up @@ -267,7 +268,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
}

if pinnedStorenode != "" {
return m.connect(pinnedStorenode)
return m.setActiveStorenode(pinnedStorenode)
}

m.logger.Info("Finding a new storenode..")
Expand Down Expand Up @@ -299,7 +300,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
}

ms := allStorenodes[r.Int64()]
return m.connect(ms)
return m.setActiveStorenode(ms)
}

func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
Expand All @@ -313,9 +314,7 @@ func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
return peer.status
}

func (m *StorenodeCycle) connect(peerID peer.ID) error {
m.logger.Info("connecting to storenode", zap.Stringer("peerID", peerID))

func (m *StorenodeCycle) setActiveStorenode(peerID peer.ID) error {
m.activeStorenode = peerID

m.StorenodeChangedEmitter.Emit(m.activeStorenode)
Expand Down Expand Up @@ -363,7 +362,7 @@ func (m *StorenodeCycle) penalizeStorenode(id peer.ID) {
}

func (m *StorenodeCycle) verifyStorenodeStatus(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(storenodeVerificationInterval)
defer ticker.Stop()

for {
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/api/history/emitters.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func (s *Emitter[T]) Emit(value T) {
s.Lock()
defer s.Unlock()

for _, subs := range s.subscriptions {
subs <- value
for _, sub := range s.subscriptions {
sub <- value
}
}

Expand Down
46 changes: 23 additions & 23 deletions waku/v2/protocol/store/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,33 @@ func TestStoreClient(t *testing.T) {

// -- First page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp())
require.Len(t, response.Messages(), 2)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp())
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[1].GetTimestamp())

err = response.Next(ctx)
require.NoError(t, err)

// -- Second page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp())
require.Len(t, response.Messages(), 2)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[2].GetTimestamp())
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[3].GetTimestamp())

err = response.Next(ctx)
require.NoError(t, err)

// -- Third page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 1)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp())
require.Len(t, response.Messages(), 1)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[4].GetTimestamp())

err = response.Next(ctx)
require.NoError(t, err)

// -- Trying to continue a completed cursor
require.True(t, response.IsComplete())
require.Len(t, response.messages, 0)
require.Len(t, response.Messages(), 0)

err = response.Next(ctx)
require.NoError(t, err)
Expand All @@ -165,26 +165,26 @@ func TestStoreClient(t *testing.T) {

// -- First page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp())
require.Len(t, response.Messages(), 2)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[3].GetTimestamp())
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[4].GetTimestamp())

err = response.Next(ctx)
require.NoError(t, err)

// -- Second page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp())
require.Len(t, response.Messages(), 2)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[1].GetTimestamp())
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[2].GetTimestamp())

err = response.Next(ctx)
require.NoError(t, err)

// -- Third page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 1)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp())
require.Len(t, response.Messages(), 1)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp())

err = response.Next(ctx)
require.NoError(t, err)
Expand All @@ -197,13 +197,13 @@ func TestStoreClient(t *testing.T) {
// No cursor should be returned if there are no messages that match the criteria
response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "no-messages"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2))
require.NoError(t, err)
require.Len(t, response.messages, 0)
require.Len(t, response.Messages(), 0)
require.Empty(t, response.Cursor())

// If the page size is larger than the number of existing messages, it should not return a cursor
response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 100))
require.NoError(t, err)
require.Len(t, response.messages, 5)
require.Len(t, response.Messages(), 5)
require.Empty(t, response.Cursor())

// Invalid cursors should fail
Expand Down Expand Up @@ -231,11 +231,11 @@ func TestStoreClient(t *testing.T) {
// Should not include data
response, err = wakuStore.Request(ctx, MessageHashCriteria{MessageHashes: []pb.MessageHash{messages[0].Hash(pubsubTopic)}}, IncludeData(false), WithPeer(storenode.ID))
require.NoError(t, err)
require.Len(t, response.messages, 1)
require.Nil(t, response.messages[0].Message)
require.Len(t, response.Messages(), 1)
require.Nil(t, response.Messages()[0].Message)

response, err = wakuStore.Request(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test")}, IncludeData(false))
require.NoError(t, err)
require.GreaterOrEqual(t, len(response.messages), 1)
require.Nil(t, response.messages[0].Message)
require.GreaterOrEqual(t, len(response.Messages()), 1)
require.Nil(t, response.Messages()[0].Message)
}

0 comments on commit f29ce5f

Please sign in to comment.