-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
👌 feat(message:deduplication) implementing the feature #33 #229
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladshub wouldn't it be easier to de-dup when publishing from the transactional outbox rather than when consuming the message?
if err != nil { | ||
worker.span.LogFields(slog.String("grabbit", "failed processing duplicate")) | ||
worker.log().WithError(err).Error("failed checking for existing message") | ||
return true, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why returning true if you don't know the message really is a duplicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't return nil for a bool value so I return true and an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladshub I would return the default value of a bool in go which is false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rhinof this way I'm more conservative since I'd rather have this message rejected as a duplicate and go to DLQ or through the error flow then to have processed it twice if I can't ensure deduplication.
_ = worker.reject(false, delivery, msgSpecificLogEntry) | ||
return | ||
} | ||
|
||
isDuplicate, err := worker.handleDuplicates(bm, delivery, msgSpecificLogEntry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldnt you want to "handle duplicates" in case of global handlers/ dead letter handlers invocation as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the deduplication is on the GbusMessage level in dlq/global we don't yet have the gbusMessage yet
} | ||
|
||
// | ||
func (d *deduper) StoreMessageID(tx *sql.Tx, id string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you consider to store the message in another storage mechanism? like redis etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to introduce another infrastructure dependency but we can have different storages that would implement the same interface.
@rhinof we can but if we do that we can't guarantee that a service will receive only one since issues with the connection to rabbitmq might create additional duplications and also the time a message is in-flight might provide for additional duplications. |
|
||
var _ deduplicator.Store = &deduper{} | ||
|
||
type deduper struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpicking, the struct is named deduper which is the main struct here yet the file name is tx.go
gbus/worker.go
Outdated
if worker.delicatePolicy == DeduplicationPolicyNone { | ||
return false, nil | ||
} | ||
duplicate, err := worker.duplicateStore.MessageExists(message.IdempotencyKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we be passing in the active transaction ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For message exists there is no real need, we can and we pass in the StoreMessage
but here it brings no to little value
…ay to close the deduplicator
@rhinof & @adiweiss can you please re-review? |
A way to manage message de-duplication in grabbit.
Implementation details:
This PR closes issue #33