Skip to content

Commit a2e8ab8

Browse files
committed
prevent non-existent shard from iterating
1 parent 806468a commit a2e8ab8

File tree

1 file changed

+26
-23
lines changed

1 file changed

+26
-23
lines changed

stream.go

+26-23
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ type Subscriber struct {
2121
dynamoSvc DynamoService
2222
streamSvc StreamService
2323

24-
shards sync.Map
24+
shards sync.Map
25+
shardCount int
2526

2627
shardProcessQueue chan *shardProcessContext
2728

@@ -68,9 +69,11 @@ func (r *Subscriber) applyDefaults() {
6869
}
6970

7071
func (r *Subscriber) SetShardSequences(shardSequences []*ShardSequence) {
71-
for _, shardSequence := range shardSequences {
72-
if len(shardSequence.SequenceNumber) > 0 {
73-
r.shards.Store(shardSequence.ShardId, shardSequence.SequenceNumber)
72+
if r.shardCount == 0 {
73+
for _, shardSequence := range shardSequences {
74+
if len(shardSequence.SequenceNumber) > 0 {
75+
r.shards.Store(shardSequence.ShardId, shardSequence.SequenceNumber)
76+
}
7477
}
7578
}
7679
}
@@ -211,37 +214,37 @@ func (r *Subscriber) Subscribe() (<-chan *types.Record, <-chan error) {
211214
r.sendError(err)
212215
continue
213216
}
214-
if first {
215-
r.shards.Range(func(key any, value any) bool {
216-
shardId := key.(string)
217-
sequenceNumber := value.(string)
218-
r.shardProcessQueue <- newShardProcessContext(
219-
&dynamodbstreams.GetShardIteratorInput{
220-
StreamArn: streamArn,
221-
ShardIteratorType: r.shardSequenceIteratorType,
222-
ShardId: &shardId,
223-
SequenceNumber: &sequenceNumber,
224-
},
225-
r.shardIteratorInitialInterval,
226-
r.shardIteratorMaxInterval,
227-
)
228-
return true
229-
})
230-
first = false
231-
}
232217
for _, shard := range shards {
218+
var sequenceNumber *string
219+
shardIteratorType := r.shardIteratorType
220+
221+
if first {
222+
sqn, ok := r.shards.Load(*shard.ShardId)
223+
if ok {
224+
shardIteratorType = r.shardSequenceIteratorType
225+
sequenceNumberStr := sqn.(string)
226+
sequenceNumber = &sequenceNumberStr
227+
r.shards.Delete(*shard.ShardId)
228+
} else {
229+
shardIteratorType = r.shardIteratorType
230+
}
231+
}
232+
233233
if _, exist := r.shards.LoadOrStore(*shard.ShardId, ""); !exist {
234+
r.shardCount++
234235
r.shardProcessQueue <- newShardProcessContext(
235236
&dynamodbstreams.GetShardIteratorInput{
236237
StreamArn: streamArn,
237-
ShardIteratorType: r.shardIteratorType,
238+
ShardIteratorType: shardIteratorType,
238239
ShardId: shard.ShardId,
240+
SequenceNumber: sequenceNumber,
239241
},
240242
r.shardIteratorInitialInterval,
241243
r.shardIteratorMaxInterval,
242244
)
243245
}
244246
}
247+
first = false
245248
}
246249
}
247250
}()

0 commit comments

Comments
 (0)